[GitHub] [spark] SparkQA removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


SparkQA removed a comment on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-858882404


   **[Test build #139656 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139656/testReport)**
 for PR 32849 at commit 
[`410242e`](https://github.com/apache/spark/commit/410242eba1caff00f86c0355dc97fdb198cdbc70).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


SparkQA commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859148736


   **[Test build #139656 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139656/testReport)**
 for PR 32849 at commit 
[`410242e`](https://github.com/apache/spark/commit/410242eba1caff00f86c0355dc97fdb198cdbc70).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya opened a new pull request #32870: [[SPARK-35439][SQL]][FOLLOWUO] ExpressionContainmentOrdering should not sort unrelated expressions

2021-06-10 Thread GitBox


viirya opened a new pull request #32870:
URL: https://github.com/apache/spark/pull/32870


   
   
   ### What changes were proposed in this pull request?
   
   
   This is a followup of #32586. We introduced `ExpressionContainmentOrdering` 
to sort common expressions according to their parent-child relations. For 
unrelated expressions, previously the ordering returns -1 which is not correct 
and can possibly lead to transitivity issue.
   
   ### Why are the changes needed?
   
   
   To fix the possible transitivity issue of `ExpressionContainmentOrdering`.
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   No
   
   ### How was this patch tested?
   
   
   Unit test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs

2021-06-10 Thread GitBox


dongjoon-hyun closed pull request #32730:
URL: https://github.com/apache/spark/pull/32730


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs

2021-06-10 Thread GitBox


dongjoon-hyun commented on pull request #32730:
URL: https://github.com/apache/spark/pull/32730#issuecomment-859142586


   Thank you so much, @viirya ! Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


SparkQA removed a comment on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859052129


   **[Test build #139660 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139660/testReport)**
 for PR 32868 at commit 
[`3b38ae8`](https://github.com/apache/spark/commit/3b38ae861b0bf8127a2f8c621630932affb49b03).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


SparkQA commented on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859138782


   **[Test build #139660 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139660/testReport)**
 for PR 32868 at commit 
[`3b38ae8`](https://github.com/apache/spark/commit/3b38ae861b0bf8127a2f8c621630932affb49b03).
* This patch **fails PySpark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun edited a comment on pull request #32826: [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-1

2021-06-10 Thread GitBox


dongjoon-hyun edited a comment on pull request #32826:
URL: https://github.com/apache/spark/pull/32826#issuecomment-859136503


   It's just a historical fact. IMO, I believe that we need to replace it to 
`zstd-jni`.
   > Does it make sense to use aircompressor for ZSTD in ORC, rather than the 
zstd-jni?
   
   Yes, `aircompressor` is behind and also has ZSTD bug. That's the reason why 
the community (not only Apache ORC, but also Presto) complains at the new 
version of aircompressor.
   - https://github.com/airlift/aircompressor/issues/122
   
   BTW, please note that your PR is merged to Apache ORC 1.7 which has no 
release plan yet. The situation is the same for the other communities. Apache 
Kafka with ZSTD 1.5? Apache Avro with ZSTD 1.5? Apache Parquet with ZSTD 1.5? 
Apache Spark should embrace those Apache Projects together because our 
customers are able to use them together in a single app.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #32826: [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-1

2021-06-10 Thread GitBox


dongjoon-hyun commented on pull request #32826:
URL: https://github.com/apache/spark/pull/32826#issuecomment-859136503


   It's just a historical factor. IMO, I believe that we need to replace it to 
`zstd-jni`.
   > Does it make sense to use aircompressor for ZSTD in ORC, rather than the 
zstd-jni?
   
   Yes, `aircompressor` is behind and also has ZSTD bug. That's the reason why 
the community (not only Apache ORC, but also Presto) complains at the new 
version of aircompressor.
   - https://github.com/airlift/aircompressor/issues/122
   
   BTW, please note that your PR is merged to Apache ORC 1.7 which has no 
release plan yet. The situation is the same for the other community. Apache 
Kafka with ZSTD 1.5? Apache Avro with ZSTD 1.5? Apache Parquet with ZSTD 1.5? 
Apache Spark should embrace those Apache Projects together because our 
customers are able to use them together in a single app.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun edited a comment on pull request #32826: [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-1

2021-06-10 Thread GitBox


dongjoon-hyun edited a comment on pull request #32826:
URL: https://github.com/apache/spark/pull/32826#issuecomment-859136503


   It's just a historical fact. IMO, I believe that we need to replace it to 
`zstd-jni`.
   > Does it make sense to use aircompressor for ZSTD in ORC, rather than the 
zstd-jni?
   
   Yes, `aircompressor` is behind and also has ZSTD bug. That's the reason why 
the community (not only Apache ORC, but also Presto) complains at the new 
version of aircompressor.
   - https://github.com/airlift/aircompressor/issues/122
   
   BTW, please note that your PR is merged to Apache ORC 1.7 which has no 
release plan yet. The situation is the same for the other community. Apache 
Kafka with ZSTD 1.5? Apache Avro with ZSTD 1.5? Apache Parquet with ZSTD 1.5? 
Apache Spark should embrace those Apache Projects together because our 
customers are able to use them together in a single app.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Tonix517 commented on a change in pull request #29000: [SPARK-27194][SPARK-29302][SQL] Fix commit collision in dynamic partition overwrite mode

2021-06-10 Thread GitBox


Tonix517 commented on a change in pull request #29000:
URL: https://github.com/apache/spark/pull/29000#discussion_r649580767



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
##
@@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol(
 // The specified output committer is a FileOutputCommitter.
 // So, we will use the FileOutputCommitter-specified constructor.
 val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
-committer = ctor.newInstance(new Path(path), context)
+val committerOutputPath = if (dynamicPartitionOverwrite) stagingDir 
else new Path(path)
+committer = ctor.newInstance(committerOutputPath, context)

Review comment:
   Hey @WinkerDu - thank you for the PR. One question: when 
dynamicPartitionOverwrite is on, this code block will only execute when `clazz` 
is non-null, which means SQLConf.OUTPUT_COMMITTER_CLASS is set. It works for 
parquet files since that SQL property is set at 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L97.
 So what about other file formats, like Orc? There seems to be no such property 
set logic for other file formats, at lease in Spark repo. So is 
dynamicPartitionOverwrite supposed to be for Parquet only? Am I missing sth 
here? Thanks. 
   
   @cloud-fan @Ngone51 @agrawaldevesh




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs

2021-06-10 Thread GitBox


dongjoon-hyun commented on a change in pull request #32730:
URL: https://github.com/apache/spark/pull/32730#discussion_r649578467



##
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
+
+import org.apache.spark.{LocalSparkContext, MapOutputTrackerMaster, SparkConf, 
SparkContext, SparkFunSuite, TestUtils}
+import org.apache.spark.LocalSparkContext.withSpark
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+
+class KubernetesLocalDiskShuffleDataIOSuite extends SparkFunSuite with 
LocalSparkContext {
+
+  val conf = new SparkConf()
+.setAppName("ShuffleExecutorComponentsSuite")
+.setMaster("local-cluster[1,1,1024]")
+.set(UI.UI_ENABLED, false)
+.set(DYN_ALLOCATION_ENABLED, true)
+.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 1)
+.set(DYN_ALLOCATION_MIN_EXECUTORS, 1)
+.set(IO_ENCRYPTION_ENABLED, false)
+.set(KUBERNETES_DRIVER_REUSE_PVC, true)
+.set(SHUFFLE_IO_PLUGIN_CLASS, 
classOf[KubernetesLocalDiskShuffleDataIO].getName)
+
+  test("recompute is not blocked by the recovery") {
+sc = new SparkContext(conf)
+withSpark(sc) { sc =>
+  val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+  assert(master.shuffleStatuses.isEmpty)
+
+  val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3)
+.groupByKey()
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  val loc1 = master.shuffleStatuses(0).mapStatuses(0).location
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(0, 1, 2))
+
+  // Reuse the existing shuffle data
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(0, 1, 2))
+
+  // Decommission all executors
+  val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+  sc.getExecutorIds().foreach { id =>
+sched.killExecutor(id)
+  }
+  TestUtils.waitUntilExecutorsUp(sc, 1, 6)
+  // Shuffle status are removed
+  eventually(timeout(60.second), interval(1.seconds)) {
+assert(master.shuffleStatuses.keys.toSet == Set(0))
+assert(master.shuffleStatuses(0).mapStatuses.forall(_ == null))
+  }
+
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(9, 10, 11))
+}
+  }
+
+  test("Partial recompute shuffle data") {
+sc = new SparkContext(conf)
+withSpark(sc) { sc =>
+  val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+  assert(master.shuffleStatuses.isEmpty)
+
+  val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 
3).groupByKey()
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  val loc1 = master.shuffleStatuses(0).mapStatuses(0).location
+  assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1))
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(0, 1, 2))
+
+  // Reuse the existing shuffle data
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1))
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(0, 1, 2))
+
+  val rdd2 = sc.parallelize(Seq((4, "four"), (5, "five"), (6, "six"), (7, 
"seven")), 4)
+.groupByKey()
+  rdd2.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0, 1))
+  assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1))
+  

[GitHub] [spark] dongjoon-hyun commented on a change in pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs

2021-06-10 Thread GitBox


dongjoon-hyun commented on a change in pull request #32730:
URL: https://github.com/apache/spark/pull/32730#discussion_r649577655



##
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
+
+import org.apache.spark.{LocalSparkContext, MapOutputTrackerMaster, SparkConf, 
SparkContext, SparkFunSuite, TestUtils}
+import org.apache.spark.LocalSparkContext.withSpark
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+
+class KubernetesLocalDiskShuffleDataIOSuite extends SparkFunSuite with 
LocalSparkContext {
+
+  val conf = new SparkConf()
+.setAppName("ShuffleExecutorComponentsSuite")
+.setMaster("local-cluster[1,1,1024]")
+.set(UI.UI_ENABLED, false)
+.set(DYN_ALLOCATION_ENABLED, true)
+.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 1)
+.set(DYN_ALLOCATION_MIN_EXECUTORS, 1)
+.set(IO_ENCRYPTION_ENABLED, false)
+.set(KUBERNETES_DRIVER_REUSE_PVC, true)
+.set(SHUFFLE_IO_PLUGIN_CLASS, 
classOf[KubernetesLocalDiskShuffleDataIO].getName)
+
+  test("recompute is not blocked by the recovery") {
+sc = new SparkContext(conf)
+withSpark(sc) { sc =>
+  val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+  assert(master.shuffleStatuses.isEmpty)
+
+  val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3)
+.groupByKey()
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  val loc1 = master.shuffleStatuses(0).mapStatuses(0).location

Review comment:
   Yes, `loc1` is not used in this test case. I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs

2021-06-10 Thread GitBox


viirya commented on a change in pull request #32730:
URL: https://github.com/apache/spark/pull/32730#discussion_r649576592



##
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
+
+import org.apache.spark.{LocalSparkContext, MapOutputTrackerMaster, SparkConf, 
SparkContext, SparkFunSuite, TestUtils}
+import org.apache.spark.LocalSparkContext.withSpark
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+
+class KubernetesLocalDiskShuffleDataIOSuite extends SparkFunSuite with 
LocalSparkContext {
+
+  val conf = new SparkConf()
+.setAppName("ShuffleExecutorComponentsSuite")
+.setMaster("local-cluster[1,1,1024]")
+.set(UI.UI_ENABLED, false)
+.set(DYN_ALLOCATION_ENABLED, true)
+.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 1)
+.set(DYN_ALLOCATION_MIN_EXECUTORS, 1)
+.set(IO_ENCRYPTION_ENABLED, false)
+.set(KUBERNETES_DRIVER_REUSE_PVC, true)
+.set(SHUFFLE_IO_PLUGIN_CLASS, 
classOf[KubernetesLocalDiskShuffleDataIO].getName)
+
+  test("recompute is not blocked by the recovery") {
+sc = new SparkContext(conf)
+withSpark(sc) { sc =>
+  val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+  assert(master.shuffleStatuses.isEmpty)
+
+  val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3)
+.groupByKey()
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  val loc1 = master.shuffleStatuses(0).mapStatuses(0).location
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(0, 1, 2))
+
+  // Reuse the existing shuffle data
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(0, 1, 2))
+
+  // Decommission all executors
+  val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+  sc.getExecutorIds().foreach { id =>
+sched.killExecutor(id)
+  }
+  TestUtils.waitUntilExecutorsUp(sc, 1, 6)
+  // Shuffle status are removed
+  eventually(timeout(60.second), interval(1.seconds)) {
+assert(master.shuffleStatuses.keys.toSet == Set(0))
+assert(master.shuffleStatuses(0).mapStatuses.forall(_ == null))
+  }
+
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(9, 10, 11))
+}
+  }
+
+  test("Partial recompute shuffle data") {
+sc = new SparkContext(conf)
+withSpark(sc) { sc =>
+  val master = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+  assert(master.shuffleStatuses.isEmpty)
+
+  val rdd = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 
3).groupByKey()
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  val loc1 = master.shuffleStatuses(0).mapStatuses(0).location
+  assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1))
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(0, 1, 2))
+
+  // Reuse the existing shuffle data
+  rdd.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0))
+  assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1))
+  assert(master.shuffleStatuses(0).mapStatuses.map(_.mapId).toSet == 
Set(0, 1, 2))
+
+  val rdd2 = sc.parallelize(Seq((4, "four"), (5, "five"), (6, "six"), (7, 
"seven")), 4)
+.groupByKey()
+  rdd2.collect()
+  assert(master.shuffleStatuses.keys.toSet == Set(0, 1))
+  assert(master.shuffleStatuses(0).mapStatuses.forall(_.location == loc1))
+  

[GitHub] [spark] AmplabJenkins removed a comment on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859126609


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44188/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859126609


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44188/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


SparkQA commented on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859126580


   Kubernetes integration test status success
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44188/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859124767


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44187/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32869:
URL: https://github.com/apache/spark/pull/32869#issuecomment-859124766


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139653/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32862:
URL: https://github.com/apache/spark/pull/32862#issuecomment-859124769


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139654/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32862:
URL: https://github.com/apache/spark/pull/32862#issuecomment-859124769


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139654/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859124767


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44187/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32869:
URL: https://github.com/apache/spark/pull/32869#issuecomment-859124766


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139653/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


SparkQA removed a comment on pull request #32869:
URL: https://github.com/apache/spark/pull/32869#issuecomment-858818300


   **[Test build #139653 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139653/testReport)**
 for PR 32869 at commit 
[`dde4cd8`](https://github.com/apache/spark/commit/dde4cd800a3d2e9aac55d05b5d99cc00fba0a2e6).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


SparkQA commented on pull request #32869:
URL: https://github.com/apache/spark/pull/32869#issuecomment-859119484


   **[Test build #139653 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139653/testReport)**
 for PR 32869 at commit 
[`dde4cd8`](https://github.com/apache/spark/commit/dde4cd800a3d2e9aac55d05b5d99cc00fba0a2e6).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dchristle commented on pull request #32826: [SPARK-35670][BUILD] Upgrade ZSTD-JNI to 1.5.0-1

2021-06-10 Thread GitBox


dchristle commented on pull request #32826:
URL: https://github.com/apache/spark/pull/32826#issuecomment-859115935


   > Thank you for your efforts. BTW, @dchristle . Please note that your ORC PR 
is not about ZSTD-JNI. It's native ZSTD library only. I commented on your ORC 
PR about the difference.
   > 
   > For the following, I saw Kafka failures.
   > 
   > > They appear to pass their respective CIs.
   > 
   > https://user-images.githubusercontent.com/9700541/121303167-9afc7380-c8af-11eb-9e2e-7500a3467325.png;>
   > 
   > No worry~ For Apache Avro, they have a dependency bot. I guess they will 
catch up soon. Let's wait and see their activity.
   > 
   > > I have less familiarity with Avro's build chains/codebase, so I did not 
attempt to test it yet.
   > 
   > In addition, all libraries should be synced inside Apache Spark because 
Apache Spark is using everything.
   
   Yes, for ORC it's the native C library and not Java. I have a tangential 
question for you: Does it make sense to use `aircompressor` for ZSTD in ORC, 
rather than the `zstd-jni`? It does not seem to keep up with the latest `zstd`, 
and the implementation seems to lack support for many of the strategies 
employed at different compression levels, if I understand the code here 
https://github.com/airlift/aircompressor/blob/495bae80ac7487d2efa1bba437d04e8a2a42bb7b/src/main/java/io/airlift/compress/zstd/CompressionParameters.java#L143
 correctly.
   
   The reason I ask is because it is conceivable that `zstd` in the future 
makes an incompatible change that propagates to `zstd-jni` but not 
`aircompressor`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree

2021-06-10 Thread GitBox


SparkQA removed a comment on pull request #32862:
URL: https://github.com/apache/spark/pull/32862#issuecomment-858818317


   **[Test build #139654 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139654/testReport)**
 for PR 32862 at commit 
[`9902e03`](https://github.com/apache/spark/commit/9902e03b2cce528f91b1e4423252d77f689e5767).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree

2021-06-10 Thread GitBox


SparkQA commented on pull request #32862:
URL: https://github.com/apache/spark/pull/32862#issuecomment-859115485


   **[Test build #139654 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139654/testReport)**
 for PR 32862 at commit 
[`9902e03`](https://github.com/apache/spark/commit/9902e03b2cce528f91b1e4423252d77f689e5767).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
 * `class BooleanExtensionOps(BooleanOps):`
 * `class NullOps(DataTypeOps):`
 * `class UDTOps(DataTypeOps):`
 * `public final class CompositeReadLimit implements ReadLimit `
 * `public final class ReadMinRows implements ReadLimit `
 * `trait InvokeLike extends Expression with NonSQLExpression with 
ImplicitCastInputTypes `
 * `case class LateralSubquery(`
 * `case class LateralJoin(`
 * `case class CommandResultExec(`
 * `class RocksDBFileManager(`
 * `  sealed trait SchemaReader `
 * `  class SchemaV1Reader extends SchemaReader `
 * `  class SchemaV2Reader extends SchemaReader `
 * `  trait SchemaWriter `
 * `  class SchemaV1Writer extends SchemaWriter `
 * `  class SchemaV2Writer extends SchemaWriter `
 * `case class CommandResult(`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


SparkQA commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859115229


   Kubernetes integration test status success
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44187/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


mridulm commented on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-859108418


   Thanks for working on this @zhouyejoe
   Thanks for all the reviews @Ngone51, @Victsm, @otterc !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] asfgit closed pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


asfgit closed pull request #32007:
URL: https://github.com/apache/spark/pull/32007


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


mridulm commented on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-859105499


   The spark UI test which failed in jenkins is unrelated to this PR.
   Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


SparkQA commented on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859104452


   Kubernetes integration test starting
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44188/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data

2021-06-10 Thread GitBox


otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649548771



##
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##
@@ -1124,4 +1394,54 @@ object ShuffleBlockFetcherIterator {
*/
   private[storage]
   case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends 
FetchResult
+
+  /**
+   * Result of an un-successful fetch of either of these:
+   * 1) Remote shuffle block chunk.
+   * 2) Local merged block data.
+   *
+   * Instead of treating this as a FailureFetchResult, we ignore this failure
+   * and fallback to fetch the original unmerged blocks.
+   * @param blockId block id
+   * @param address BlockManager that the merged block was attempted to be 
fetched from
+   * @param size size of the block, used to update bytesInFlight.
+   * @param isNetworkReqDone Is this the last network request for this host in 
this fetch
+   * request. Used to update reqsInFlight.
+   */
+  private[storage] case class IgnoreFetchResult(blockId: BlockId,

Review comment:
   Will `Retriable` be confusing that this request itself is retried? Can 
call it `FallbackOnFailureFetchResult`. Let me know what you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


SparkQA commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859091608


   Kubernetes integration test starting
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44187/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32854:
URL: https://github.com/apache/spark/pull/32854#issuecomment-859087811


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139651/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32854:
URL: https://github.com/apache/spark/pull/32854#issuecomment-859087811


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139651/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier

2021-06-10 Thread GitBox


SparkQA removed a comment on pull request #32854:
URL: https://github.com/apache/spark/pull/32854#issuecomment-858769118


   **[Test build #139651 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139651/testReport)**
 for PR 32854 at commit 
[`bd96508`](https://github.com/apache/spark/commit/bd9650841fff48c7456e226d564ac04c4ed5e217).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier

2021-06-10 Thread GitBox


SparkQA commented on pull request #32854:
URL: https://github.com/apache/spark/pull/32854#issuecomment-859075631


   **[Test build #139651 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139651/testReport)**
 for PR 32854 at commit 
[`bd96508`](https://github.com/apache/spark/commit/bd9650841fff48c7456e226d564ac04c4ed5e217).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on pull request #30135: [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.1

2021-06-10 Thread GitBox


sunchao commented on pull request #30135:
URL: https://github.com/apache/spark/pull/30135#issuecomment-859061487


   > maybe the build is picking up the old RC
   
   Yes that's exactly what happened :)
   
   For the regression, don't know the full context behind the original change 
but seems like a good thing to do, although a boolean flag returned might be 
less disruptive IMO :). Either way it is a simple fix from Spark side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #32750: [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9

2021-06-10 Thread GitBox


dongjoon-hyun commented on pull request #32750:
URL: https://github.com/apache/spark/pull/32750#issuecomment-859055684


   Hi, @wangyum . 
   
   @sunchao uploaded Apache Hive 2.3.9 to the Maven Central. Could you revise 
this PR?
   - https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs

2021-06-10 Thread GitBox


dongjoon-hyun commented on pull request #32730:
URL: https://github.com/apache/spark/pull/32730#issuecomment-859054062


   Please let me know if there are any other comments, @viirya and @mridulm ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32868: [SPARK-35714][CORE] Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


SparkQA commented on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859052129


   **[Test build #139660 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139660/testReport)**
 for PR 32868 at commit 
[`3b38ae8`](https://github.com/apache/spark/commit/3b38ae861b0bf8127a2f8c621630932affb49b03).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #32868: [SPARK-35714]Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


dongjoon-hyun commented on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859051087


   cc @mridulm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #32868: [SPARK-35714]Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


dongjoon-hyun commented on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-859050639


   ok to test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32868: [SPARK-35714]Bug fix for deadlock during the executor shutdown

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32868:
URL: https://github.com/apache/spark/pull/32868#issuecomment-858682923


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a change in pull request #32730: [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs

2021-06-10 Thread GitBox


dongjoon-hyun commented on a change in pull request #32730:
URL: https://github.com/apache/spark/pull/32730#discussion_r649519438



##
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.util.Optional
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, 
ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, 
UnrecognizedBlockId}
+import org.apache.spark.util.Utils
+
+class KubernetesLocalDiskShuffleExecutorComponents(sparkConf: SparkConf)
+  extends ShuffleExecutorComponents with Logging {
+
+  private val delegate = new LocalDiskShuffleExecutorComponents(sparkConf)
+  private var blockManager: BlockManager = _
+
+  override def initializeExecutor(
+  appId: String, execId: String, extraConfigs: java.util.Map[String, 
String]): Unit = {
+delegate.initializeExecutor(appId, execId, extraConfigs)
+blockManager = SparkEnv.get.blockManager
+if 
(sparkConf.getBoolean("spark.kubernetes.driver.reusePersistentVolumeClaim", 
false)) {
+  // Turn off the deletion of the shuffle data in order to reuse
+  blockManager.diskBlockManager.deleteFilesOnStop = false
+  Utils.tryLogNonFatalError {
+
KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, 
blockManager)
+  }
+}
+  }
+
+  override def createMapOutputWriter(shuffleId: Int, mapTaskId: Long, 
numPartitions: Int)
+: ShuffleMapOutputWriter = {
+delegate.createMapOutputWriter(shuffleId, mapTaskId, numPartitions)
+  }
+
+  override def createSingleFileMapOutputWriter(shuffleId: Int, mapId: Long)
+: Optional[SingleSpillShuffleMapOutputWriter] = {
+delegate.createSingleFileMapOutputWriter(shuffleId, mapId)
+  }
+}
+
+object KubernetesLocalDiskShuffleExecutorComponents extends Logging {
+  /**
+   * This tries to recover shuffle data of dead executors' local dirs if 
exists.
+   * Since the executors are already dead, we cannot use `getHostLocalDirs`.
+   * This is enabled only when 
spark.kubernetes.driver.reusePersistentVolumeClaim is true.
+   */
+  def recoverDiskStore(conf: SparkConf, bm: BlockManager): Unit = {
+// Find All files
+val files = Utils.getConfiguredLocalDirs(conf)
+  .filter(_ != null)
+  .map(s => new File(new File(new File(s).getParent).getParent))
+  .flatMap { dir =>
+val oldDirs = dir.listFiles().filter { f =>
+  f.isDirectory && f.getName.startsWith("spark-")
+}
+val files = oldDirs
+  .flatMap(_.listFiles).filter(_.isDirectory) // executor-xxx
+  .flatMap(_.listFiles).filter(_.isDirectory) // blockmgr-xxx
+  .flatMap(_.listFiles).filter(_.isDirectory) // 00

Review comment:
   Sure, we can check the pattern. I will do later~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] steveloughran commented on pull request #30135: [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.1

2021-06-10 Thread GitBox


steveloughran commented on pull request #30135:
URL: https://github.com/apache/spark/pull/30135#issuecomment-859048545


   > reverted from RC3 so I reverted my earlier change of handling it in Spark 
code. Let me check later.
   
   maybe the build is picking up the old RC.
   
   FWIW I'm surprised about this regression...add as much as you can to the 
test, as in "what is the test trying to do?". Gabor's patch is trying to stop 
"hadoop fs" copy command from copying a file onto itself, which can be, well, 
lossy, depending on the OS and things.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #32837: [SPARK-35692][K8S] Use AtomicInteger for executor id generating

2021-06-10 Thread GitBox


dongjoon-hyun closed pull request #32837:
URL: https://github.com/apache/spark/pull/32837


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-859045755


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139658/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-859045755


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139658/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] steveloughran commented on a change in pull request #30135: [SPARK-29250][BUILD] Upgrade to Hadoop 3.3.1

2021-06-10 Thread GitBox


steveloughran commented on a change in pull request #30135:
URL: https://github.com/apache/spark/pull/30135#discussion_r649517273



##
File path: pom.xml
##
@@ -197,6 +197,7 @@
 1.8
 1.1.0
 1.2
+1.52

Review comment:
   we rolled back the latest bouncy castle update as some internal spark 
builds with some extra modules were failing as the asm JAR couldn't handle the 
changes. Tried to fiddle with the spark settings there to use an updated asm 
JAR and gave up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


SparkQA removed a comment on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-858883870


   **[Test build #139658 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139658/testReport)**
 for PR 32007 at commit 
[`e630725`](https://github.com/apache/spark/commit/e630725ca5c161cea62a2afcc7668a67a3e6d72e).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #32830: [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception

2021-06-10 Thread GitBox


dongjoon-hyun closed pull request #32830:
URL: https://github.com/apache/spark/pull/32830


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


SparkQA commented on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-859044030


   **[Test build #139658 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139658/testReport)**
 for PR 32007 at commit 
[`e630725`](https://github.com/apache/spark/commit/e630725ca5c161cea62a2afcc7668a67a3e6d72e).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


MaxGekk closed pull request #32869:
URL: https://github.com/apache/spark/pull/32869


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data

2021-06-10 Thread GitBox


otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649514612



##
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##
@@ -436,24 +485,48 @@ final class ShuffleBlockFetcherIterator(
 val iterator = blockInfos.iterator
 var curRequestSize = 0L
 var curBlocks = Seq.empty[FetchBlockInfo]
-
 while (iterator.hasNext) {
   val (blockId, size, mapIndex) = iterator.next()
-  assertPositiveBlockSize(blockId, size)
   curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex))
   curRequestSize += size
-  // For batch fetch, the actual block in flight should count for merged 
block.
-  val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= 
maxBlocksInFlightPerAddress
-  if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
-curBlocks = createFetchRequests(curBlocks, address, isLast = false,
-  collectedRemoteRequests)
-curRequestSize = curBlocks.map(_.size).sum
+  blockId match {
+// Either all blocks are merged blocks, merged block chunks, or 
original non-merged blocks.
+// Based on these types, we decide to do batch fetch and create 
FetchRequests with
+// forMergedMetas set.
+case ShuffleBlockChunkId(_, _, _) =>
+  if (curRequestSize >= targetRemoteRequestSize ||
+curBlocks.size >= maxBlocksInFlightPerAddress) {
+curBlocks = createFetchRequests(curBlocks, address, isLast = false,
+  collectedRemoteRequests, enableBatchFetch = false)
+curRequestSize = curBlocks.map(_.size).sum
+  }
+case ShuffleBlockId(_, SHUFFLE_PUSH_MAP_ID, _) =>
+  if (curBlocks.size >= maxBlocksInFlightPerAddress) {
+curBlocks = createFetchRequests(curBlocks, address, isLast = false,
+  collectedRemoteRequests, enableBatchFetch = false, 
forMergedMetas = true)
+  }
+case _ =>
+  // For batch fetch, the actual block in flight should count for 
merged block.
+  val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= 
maxBlocksInFlightPerAddress
+  if (curRequestSize >= targetRemoteRequestSize || 
mayExceedsMaxBlocks) {
+curBlocks = createFetchRequests(curBlocks, address, isLast = false,
+  collectedRemoteRequests, enableBatchFetch = doBatchFetch)
+curRequestSize = curBlocks.map(_.size).sum
+  }
   }
 }
 // Add in the final request
 if (curBlocks.nonEmpty) {
+  val (enableBatchFetch, areMergedBlocks) = {
+curBlocks.head.blockId match {
+  case ShuffleBlockChunkId(_, _, _) => (false, false)
+  case ShuffleBlockId(_, SHUFFLE_PUSH_MAP_ID, _) => (false, true)
+  case _ => (doBatchFetch, false)
+}
+  }
   curBlocks = createFetchRequests(curBlocks, address, isLast = true,
-collectedRemoteRequests)
+collectedRemoteRequests, enableBatchFetch = enableBatchFetch,
+forMergedMetas = areMergedBlocks)
   curRequestSize = curBlocks.map(_.size).sum

Review comment:
   We do want the sum of the sizes of all the blocks in `curBlocks` so I 
think the `sum` is needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


MaxGekk commented on pull request #32869:
URL: https://github.com/apache/spark/pull/32869#issuecomment-859039767


   +1, LGTM. Merging to master.
   Thank you @gengliangwang, and @cloud-fan for your review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


SparkQA commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859038318


   **[Test build #139659 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139659/testReport)**
 for PR 32849 at commit 
[`32fae30`](https://github.com/apache/spark/commit/32fae304389bbf5f3d9ae7cfb1a2496ae9e84a35).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-859033409


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44186/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859033402


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44184/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32847: [WIP] [SPARK-35616][PYTHON] Make astype data-type-based

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32847:
URL: https://github.com/apache/spark/pull/32847#issuecomment-859033406


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44185/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32470: [SPARK-35712][SQL] Simplify ResolveAggregateFunctions

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32470:
URL: https://github.com/apache/spark/pull/32470#issuecomment-859033403


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139652/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32849: [WIP][SPARK-35704][SQL] Add fields to `DayTimeIntervalType`

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859033402


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44184/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-859033409


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44186/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32847: [WIP] [SPARK-35616][PYTHON] Make astype data-type-based

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32847:
URL: https://github.com/apache/spark/pull/32847#issuecomment-859033406


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44185/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32470: [SPARK-35712][SQL] Simplify ResolveAggregateFunctions

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32470:
URL: https://github.com/apache/spark/pull/32470#issuecomment-859033403


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139652/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data

2021-06-10 Thread GitBox


otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649505327



##
File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
##
@@ -347,77 +355,118 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
-  private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = {
+  /**
+   * This is called from initialize and also from the fallback which is 
triggered from
+   * [[PushBasedFetchHelper]].
+   */
+  private[this] def partitionBlocksByFetchMode(
+  blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],
+  localBlocks: mutable.LinkedHashSet[(BlockId, Int)],
+  hostLocalBlocksByExecutor: mutable.LinkedHashMap[BlockManagerId, 
Seq[(BlockId, Long, Int)]],
+  mergedLocalBlocks: mutable.LinkedHashSet[BlockId]): 
ArrayBuffer[FetchRequest] = {
 logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: "
   + s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress: 
$maxBlocksInFlightPerAddress")
 
-// Partition to local, host-local and remote blocks. Remote blocks are 
further split into
-// FetchRequests of size at most maxBytesInFlight in order to limit the 
amount of data in flight
+// Partition to local, host-local, merged-local, remote (includes 
merged-remote) blocks.
+// Remote blocks are further split into FetchRequests of size at most 
maxBytesInFlight in order
+// to limit the amount of data in flight
 val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
+val hostLocalBlocksCurrentIteration = mutable.LinkedHashSet[(BlockId, 
Int)]()
 var localBlockBytes = 0L
 var hostLocalBlockBytes = 0L
+var mergedLocalBlockBytes = 0L
 var remoteBlockBytes = 0L
+val prevNumBlocksToFetch = numBlocksToFetch
 
 val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
 for ((address, blockInfos) <- blocksByAddress) {
-  if (Seq(blockManager.blockManagerId.executorId, 
fallback).contains(address.executorId)) {
-checkBlockSizes(blockInfos)
+  checkBlockSizes(blockInfos)
+  if (pushBasedFetchHelper.isMergedShuffleBlockAddress(address)) {
+// These are push-based merged blocks or chunks of these merged blocks.
+if (address.host == blockManager.blockManagerId.host) {
+  val pushMergedBlockInfos = blockInfos.map(
+info => FetchBlockInfo(info._1, info._2, info._3))
+  numBlocksToFetch += pushMergedBlockInfos.size
+  mergedLocalBlocks ++= pushMergedBlockInfos.map(info => info.blockId)
+  val size = pushMergedBlockInfos.map(_.size).sum
+  logInfo(s"Got ${pushMergedBlockInfos.size} local merged blocks " +
+s"of size $size")
+  mergedLocalBlockBytes += size
+} else {
+  remoteBlockBytes += blockInfos.map(_._2).sum
+  collectFetchRequests(address, blockInfos, collectedRemoteRequests)
+}
+  } else if (
+Seq(blockManager.blockManagerId.executorId, 
fallback).contains(address.executorId)) {
 val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
   blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), 
doBatchFetch)
 numBlocksToFetch += mergedBlockInfos.size
 localBlocks ++= mergedBlockInfos.map(info => (info.blockId, 
info.mapIndex))
 localBlockBytes += mergedBlockInfos.map(_.size).sum
   } else if (blockManager.hostLocalDirManager.isDefined &&
 address.host == blockManager.blockManagerId.host) {
-checkBlockSizes(blockInfos)
 val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
   blockInfos.map(info => FetchBlockInfo(info._1, info._2, info._3)), 
doBatchFetch)
 numBlocksToFetch += mergedBlockInfos.size
 val blocksForAddress =
   mergedBlockInfos.map(info => (info.blockId, info.size, 
info.mapIndex))
 hostLocalBlocksByExecutor += address -> blocksForAddress
-hostLocalBlocks ++= blocksForAddress.map(info => (info._1, info._3))
+hostLocalBlocksCurrentIteration ++= blocksForAddress.map(info => 
(info._1, info._3))
 hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum
   } else {
 remoteBlockBytes += blockInfos.map(_._2).sum
 collectFetchRequests(address, blockInfos, collectedRemoteRequests)
   }
 }
 val numRemoteBlocks = collectedRemoteRequests.map(_.blocks.size).sum
-val totalBytes = localBlockBytes + remoteBlockBytes + hostLocalBlockBytes
-assert(numBlocksToFetch == localBlocks.size + hostLocalBlocks.size + 
numRemoteBlocks,
-  s"The number of non-empty blocks $numBlocksToFetch doesn't equal to the 
number of local " +
-s"blocks ${localBlocks.size} + the number of host-local blocks 
${hostLocalBlocks.size} " +
-s"+ the number of remote blocks ${numRemoteBlocks}.")
-logInfo(s"Getting $numBlocksToFetch 

[GitHub] [spark] MaxGekk commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


MaxGekk commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859028904


   @cloud-fan I have addressed all comments except of opening JIRA sub-task. 
Will do that as soon as all test are passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


MaxGekk commented on a change in pull request #32849:
URL: https://github.com/apache/spark/pull/32849#discussion_r649502078



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala
##
@@ -405,8 +405,9 @@ case class MultiplyDTInterval(
   override def left: Expression = interval
   override def right: Expression = num
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(DayTimeIntervalType, 
NumericType)
-  override def dataType: DataType = DayTimeIntervalType
+  // TODO(SPARK-X): Multiply day-time intervals with any fields
+  override def inputTypes: Seq[AbstractDataType] = Seq(DayTimeIntervalType(), 
NumericType)

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


MaxGekk commented on a change in pull request #32849:
URL: https://github.com/apache/spark/pull/32849#discussion_r649497221



##
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
##
@@ -21,10 +21,7 @@
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;

Review comment:
   done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #32470: [SPARK-35712][SQL] Simplify ResolveAggregateFunctions

2021-06-10 Thread GitBox


SparkQA removed a comment on pull request #32470:
URL: https://github.com/apache/spark/pull/32470#issuecomment-858777838


   **[Test build #139652 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139652/testReport)**
 for PR 32470 at commit 
[`1e973ef`](https://github.com/apache/spark/commit/1e973ef66b445e3d70df92a52b39e99f7c491151).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


SparkQA commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-859006068


   Kubernetes integration test status success
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44184/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32470: [SPARK-35712][SQL] Simplify ResolveAggregateFunctions

2021-06-10 Thread GitBox


SparkQA commented on pull request #32470:
URL: https://github.com/apache/spark/pull/32470#issuecomment-859005261


   **[Test build #139652 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139652/testReport)**
 for PR 32470 at commit 
[`1e973ef`](https://github.com/apache/spark/commit/1e973ef66b445e3d70df92a52b39e99f7c491151).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32847: [WIP] [SPARK-35616][PYTHON] Make astype data-type-based

2021-06-10 Thread GitBox


SparkQA commented on pull request #32847:
URL: https://github.com/apache/spark/pull/32847#issuecomment-859003646


   Kubernetes integration test status success
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44185/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data

2021-06-10 Thread GitBox


otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649482176



##
File path: 
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.MapOutputTracker
+import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, 
MergedBlocksMetaListener}
+import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER
+import org.apache.spark.storage.ShuffleBlockFetcherIterator._
+
+/**
+ * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the 
push-based
+ * functionality to fetch merged block meta and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+   private val iterator: ShuffleBlockFetcherIterator,
+   private val shuffleClient: BlockStoreClient,
+   private val blockManager: BlockManager,
+   private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+  private[this] val startTimeNs = System.nanoTime()
+
+  private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+  /** A map for storing merged block shuffle chunk bitmap */
+  private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, 
RoaringBitmap]()
+
+  /**
+   * Returns true if the address is for a push-merged block.
+   */
+  def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+  }
+
+  /**
+   * Returns true if the address is not of executor local or merged local 
block. false otherwise.
+   */
+  def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+(isMergedShuffleBlockAddress(address) && address.host != 
blockManager.blockManagerId.host) ||
+  (!isMergedShuffleBlockAddress(address) && address != 
blockManager.blockManagerId)
+  }
+
+  /**
+   * Returns true if the address if of merged local block. false otherwise.
+   */
+  def isMergedLocal(address: BlockManagerId): Boolean = {
+isMergedShuffleBlockAddress(address) && address.host == 
blockManager.blockManagerId.host
+  }
+
+  def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+chunksMetaMap(blockId).getCardinality
+  }
+
+  def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+chunksMetaMap.remove(blockId)
+  }
+
+  def createChunkBlockInfosFromMetaResponse(
+  shuffleId: Int,
+  reduceId: Int,
+  blockSize: Long,
+  numChunks: Int,
+bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+val approxChunkSize = blockSize / numChunks
+val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]()
+for (i <- 0 until numChunks) {
+  val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i)
+  chunksMetaMap.put(blockChunkId, bitmaps(i))
+  logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
+  blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
+}
+blocksToFetch
+  }
+
+  def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
+val sizeMap = req.blocks.map {
+  case FetchBlockInfo(blockId, size, _) =>
+val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap
+val address = req.address
+val mergedBlocksMetaListener = new MergedBlocksMetaListener {
+  override def onSuccess(shuffleId: Int, reduceId: Int, meta: 
MergedBlockMeta): Unit = {
+logInfo(s"Received the meta of merged block for ($shuffleId, 
$reduceId)  " +
+  s"from ${req.address.host}:${req.address.port}")
+  

[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data

2021-06-10 Thread GitBox


otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649479840



##
File path: 
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.MapOutputTracker
+import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, 
MergedBlocksMetaListener}
+import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER
+import org.apache.spark.storage.ShuffleBlockFetcherIterator._
+
+/**
+ * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the 
push-based
+ * functionality to fetch merged block meta and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+   private val iterator: ShuffleBlockFetcherIterator,
+   private val shuffleClient: BlockStoreClient,
+   private val blockManager: BlockManager,
+   private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+  private[this] val startTimeNs = System.nanoTime()
+
+  private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+  /** A map for storing merged block shuffle chunk bitmap */
+  private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, 
RoaringBitmap]()

Review comment:
   Same as above. It is always accessed by the task thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data

2021-06-10 Thread GitBox


otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649479384



##
File path: 
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.MapOutputTracker
+import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, 
MergedBlocksMetaListener}
+import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER
+import org.apache.spark.storage.ShuffleBlockFetcherIterator._
+
+/**
+ * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the 
push-based
+ * functionality to fetch merged block meta and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+   private val iterator: ShuffleBlockFetcherIterator,
+   private val shuffleClient: BlockStoreClient,
+   private val blockManager: BlockManager,
+   private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+  private[this] val startTimeNs = System.nanoTime()
+
+  private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+  /** A map for storing merged block shuffle chunk bitmap */
+  private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, 
RoaringBitmap]()
+
+  /**
+   * Returns true if the address is for a push-merged block.
+   */
+  def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+  }
+
+  /**
+   * Returns true if the address is not of executor local or merged local 
block. false otherwise.
+   */
+  def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+(isMergedShuffleBlockAddress(address) && address.host != 
blockManager.blockManagerId.host) ||
+  (!isMergedShuffleBlockAddress(address) && address != 
blockManager.blockManagerId)
+  }
+
+  /**
+   * Returns true if the address if of merged local block. false otherwise.
+   */
+  def isMergedLocal(address: BlockManagerId): Boolean = {
+isMergedShuffleBlockAddress(address) && address.host == 
blockManager.blockManagerId.host
+  }
+
+  def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+chunksMetaMap(blockId).getCardinality
+  }
+
+  def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+chunksMetaMap.remove(blockId)
+  }
+
+  def createChunkBlockInfosFromMetaResponse(
+  shuffleId: Int,
+  reduceId: Int,
+  blockSize: Long,
+  numChunks: Int,
+bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+val approxChunkSize = blockSize / numChunks
+val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]()
+for (i <- 0 until numChunks) {
+  val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i)
+  chunksMetaMap.put(blockChunkId, bitmaps(i))
+  logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
+  blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
+}
+blocksToFetch
+  }
+
+  def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
+val sizeMap = req.blocks.map {
+  case FetchBlockInfo(blockId, size, _) =>
+val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap
+val address = req.address
+val mergedBlocksMetaListener = new MergedBlocksMetaListener {
+  override def onSuccess(shuffleId: Int, reduceId: Int, meta: 
MergedBlockMeta): Unit = {
+logInfo(s"Received the meta of merged block for ($shuffleId, 
$reduceId)  " +
+  s"from ${req.address.host}:${req.address.port}")
+  

[GitHub] [spark] SparkQA commented on pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread GitBox


SparkQA commented on pull request #32007:
URL: https://github.com/apache/spark/pull/32007#issuecomment-858977861


   Kubernetes integration test starting
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44186/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data

2021-06-10 Thread GitBox


otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649472914



##
File path: 
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.MapOutputTracker
+import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, 
MergedBlocksMetaListener}
+import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER
+import org.apache.spark.storage.ShuffleBlockFetcherIterator._
+
+/**
+ * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the 
push-based
+ * functionality to fetch merged block meta and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+   private val iterator: ShuffleBlockFetcherIterator,
+   private val shuffleClient: BlockStoreClient,
+   private val blockManager: BlockManager,
+   private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+  private[this] val startTimeNs = System.nanoTime()
+
+  private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+  /** A map for storing merged block shuffle chunk bitmap */
+  private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, 
RoaringBitmap]()
+
+  /**
+   * Returns true if the address is for a push-merged block.
+   */
+  def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+  }
+
+  /**
+   * Returns true if the address is not of executor local or merged local 
block. false otherwise.
+   */
+  def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+(isMergedShuffleBlockAddress(address) && address.host != 
blockManager.blockManagerId.host) ||
+  (!isMergedShuffleBlockAddress(address) && address != 
blockManager.blockManagerId)
+  }
+
+  /**
+   * Returns true if the address if of merged local block. false otherwise.
+   */
+  def isMergedLocal(address: BlockManagerId): Boolean = {
+isMergedShuffleBlockAddress(address) && address.host == 
blockManager.blockManagerId.host
+  }
+
+  def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+chunksMetaMap(blockId).getCardinality
+  }
+
+  def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+chunksMetaMap.remove(blockId)
+  }
+
+  def createChunkBlockInfosFromMetaResponse(
+  shuffleId: Int,
+  reduceId: Int,
+  blockSize: Long,
+  numChunks: Int,
+bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+val approxChunkSize = blockSize / numChunks
+val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]()
+for (i <- 0 until numChunks) {
+  val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i)
+  chunksMetaMap.put(blockChunkId, bitmaps(i))
+  logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
+  blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
+}
+blocksToFetch
+  }
+
+  def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
+val sizeMap = req.blocks.map {
+  case FetchBlockInfo(blockId, size, _) =>
+val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap
+val address = req.address
+val mergedBlocksMetaListener = new MergedBlocksMetaListener {
+  override def onSuccess(shuffleId: Int, reduceId: Int, meta: 
MergedBlockMeta): Unit = {
+logInfo(s"Received the meta of merged block for ($shuffleId, 
$reduceId)  " +
+  s"from ${req.address.host}:${req.address.port}")
+  

[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


SparkQA commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-858965222


   Kubernetes integration test starting
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44184/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32473: [SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark

2021-06-10 Thread GitBox


viirya commented on a change in pull request #32473:
URL: https://github.com/apache/spark/pull/32473#discussion_r649471892



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BloomFilterBenchmark.scala
##
@@ -81,8 +85,113 @@ object BloomFilterBenchmark extends SqlBasedBenchmark {
 }
   }
 
+  private def readORCBenchmarkForInSet(): Unit = {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val samples = df2.sample(0.03, 
128).select("value").as[String].collect()
+  val filter = "value IN (" + samples.map ( x => s"'$x'").mkString(", ") + 
")"
+
+  df2.repartition(col("value")).sort(col("value")).write.orc(path + 
"/withoutBF")
+  df2.repartition(col("value")).sort(col("value"))
+.write.option("orc.bloom.filter.columns", "value").orc(path + 
"/withBF")
+
+  runBenchmark(s"ORC Read for IN set") {
+val benchmark = new Benchmark(s"Read a row from 1M rows", 1000 * 1000, 
output = output)

Review comment:
   ditto.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32473: [SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark

2021-06-10 Thread GitBox


viirya commented on a change in pull request #32473:
URL: https://github.com/apache/spark/pull/32473#discussion_r649471780



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BloomFilterBenchmark.scala
##
@@ -81,8 +85,113 @@ object BloomFilterBenchmark extends SqlBasedBenchmark {
 }
   }
 
+  private def readORCBenchmarkForInSet(): Unit = {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val samples = df2.sample(0.03, 
128).select("value").as[String].collect()
+  val filter = "value IN (" + samples.map ( x => s"'$x'").mkString(", ") + 
")"
+
+  df2.repartition(col("value")).sort(col("value")).write.orc(path + 
"/withoutBF")
+  df2.repartition(col("value")).sort(col("value"))
+.write.option("orc.bloom.filter.columns", "value").orc(path + 
"/withBF")
+
+  runBenchmark(s"ORC Read for IN set") {

Review comment:
   nit: s"" is not needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-858958878


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139643/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32473: [SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark

2021-06-10 Thread GitBox


viirya commented on a change in pull request #32473:
URL: https://github.com/apache/spark/pull/32473#discussion_r649470905



##
File path: sql/core/benchmarks/BloomFilterBenchmark-jdk11-results.txt
##
@@ -2,23 +2,179 @@
 ORC Write
 

 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 Write 100M rows:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Without bloom filter  19503  19621 
166  5.1 195.0   1.0X
-With bloom filter 22472  22710 
335  4.4 224.7   0.9X
+Without bloom filter  13568  13645 
109  7.4 135.7   1.0X
+With bloom filter 16116  16238 
172  6.2 161.2   0.8X
 
 
 

 ORC Read
 

 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 Read a row from 100M rows:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Without bloom filter   1981   2040 
 82 50.5  19.8   1.0X
-With bloom filter  1428   1467 
 54 70.0  14.3   1.4X
+Without bloom filter   1572   1605 
 47 63.6  15.7   1.0X
+With bloom filter  1343   1359 
 23 74.5  13.4   1.2X
+
+
+
+ORC Read for IN set
+
+
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Read a row from 1M rows:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Without bloom filter 51 63 
 15 19.6  51.1   1.0X
+With bloom filter54 88 
 23 18.5  54.0   0.9X
+
+
+
+Parquet Write
+
+
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Write 100M rows:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Without bloom filter  13679  13954 
389  7.3 136.8   1.0X
+With bloom filter 18260  18284 
 33  5.5 182.6   0.7X
+
+
+
+Parquet Read
+
+
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Read a row from 100M rows:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Without bloom filter, blocksize: 2097152954984 
 49104.8   9.5   1.0X
+With bloom filter, blocksize: 2097152   285307 
 21350.4   2.9   3.3X
+
+

[GitHub] [spark] AmplabJenkins removed a comment on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32854:
URL: https://github.com/apache/spark/pull/32854#issuecomment-858958876


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44179/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32869:
URL: https://github.com/apache/spark/pull/32869#issuecomment-858958880


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44181/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree

2021-06-10 Thread GitBox


AmplabJenkins removed a comment on pull request #32862:
URL: https://github.com/apache/spark/pull/32862#issuecomment-858958887


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44182/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-858958878


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139643/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32854: [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32854:
URL: https://github.com/apache/spark/pull/32854#issuecomment-858958876


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44179/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32862:
URL: https://github.com/apache/spark/pull/32862#issuecomment-858958887


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44182/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


AmplabJenkins commented on pull request #32869:
URL: https://github.com/apache/spark/pull/32869#issuecomment-858958880


   
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/44181/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #32473: [SPARK-35345][SQL] Add Parquet tests to BloomFilterBenchmark

2021-06-10 Thread GitBox


viirya commented on a change in pull request #32473:
URL: https://github.com/apache/spark/pull/32473#discussion_r649470402



##
File path: sql/core/benchmarks/BloomFilterBenchmark-jdk11-results.txt
##
@@ -2,23 +2,179 @@
 ORC Write
 

 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 Write 100M rows:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Without bloom filter  19503  19621 
166  5.1 195.0   1.0X
-With bloom filter 22472  22710 
335  4.4 224.7   0.9X
+Without bloom filter  13568  13645 
109  7.4 135.7   1.0X
+With bloom filter 16116  16238 
172  6.2 161.2   0.8X
 
 
 

 ORC Read
 

 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 Read a row from 100M rows:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Without bloom filter   1981   2040 
 82 50.5  19.8   1.0X
-With bloom filter  1428   1467 
 54 70.0  14.3   1.4X
+Without bloom filter   1572   1605 
 47 63.6  15.7   1.0X
+With bloom filter  1343   1359 
 23 74.5  13.4   1.2X
+
+
+
+ORC Read for IN set
+
+
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Read a row from 1M rows:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Without bloom filter 51 63 
 15 19.6  51.1   1.0X
+With bloom filter54 88 
 23 18.5  54.0   0.9X
+
+
+
+Parquet Write
+
+
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Write 100M rows:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Without bloom filter  13679  13954 
389  7.3 136.8   1.0X
+With bloom filter 18260  18284 
 33  5.5 182.6   0.7X
+
+
+
+Parquet Read
+
+
+OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1047-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+Read a row from 100M rows:Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
+
+Without bloom filter, blocksize: 2097152954984 
 49104.8   9.5   1.0X
+With bloom filter, blocksize: 2097152   285307 
 21350.4   2.9   3.3X
+
+

[GitHub] [spark] SparkQA commented on pull request #32847: [WIP] [SPARK-35616][PYTHON] Make astype data-type-based

2021-06-10 Thread GitBox


SparkQA commented on pull request #32847:
URL: https://github.com/apache/spark/pull/32847#issuecomment-858957221


   Kubernetes integration test starting
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44185/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on pull request #32702: [SPARK-35565][SS] Add config for ignoring metadata directory of FileStreamSink

2021-06-10 Thread GitBox


viirya commented on pull request #32702:
URL: https://github.com/apache/spark/pull/32702#issuecomment-858953384


   @HeartSaVioR @xuanyuanking Can we move forward with this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32869: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread GitBox


SparkQA commented on pull request #32869:
URL: https://github.com/apache/spark/pull/32869#issuecomment-858946087


   Kubernetes integration test status success
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44181/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


SparkQA removed a comment on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-858674648


   **[Test build #139643 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139643/testReport)**
 for PR 32849 at commit 
[`7e502b5`](https://github.com/apache/spark/commit/7e502b56afcf6bc3238fae037323e0f0ab78edbf).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #32140: [WIP][SPARK-32922][SHUFFLE][CORE] Adds support for executors to fetch local and remote merged shuffle data

2021-06-10 Thread GitBox


otterc commented on a change in pull request #32140:
URL: https://github.com/apache/spark/pull/32140#discussion_r649457152



##
File path: 
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Success}
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.MapOutputTracker
+import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, 
MergedBlocksMetaListener}
+import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER
+import org.apache.spark.storage.ShuffleBlockFetcherIterator._
+
+/**
+ * Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the 
push-based
+ * functionality to fetch merged block meta and merged shuffle block chunks.
+ */
+private class PushBasedFetchHelper(
+   private val iterator: ShuffleBlockFetcherIterator,
+   private val shuffleClient: BlockStoreClient,
+   private val blockManager: BlockManager,
+   private val mapOutputTracker: MapOutputTracker) extends Logging {
+
+  private[this] val startTimeNs = System.nanoTime()
+
+  private[this] val localShuffleMergerBlockMgrId = BlockManagerId(
+SHUFFLE_MERGER_IDENTIFIER, blockManager.blockManagerId.host,
+blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
+
+  /** A map for storing merged block shuffle chunk bitmap */
+  private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, 
RoaringBitmap]()
+
+  /**
+   * Returns true if the address is for a push-merged block.
+   */
+  def isMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
+SHUFFLE_MERGER_IDENTIFIER.equals(address.executorId)
+  }
+
+  /**
+   * Returns true if the address is not of executor local or merged local 
block. false otherwise.
+   */
+  def isNotExecutorOrMergedLocal(address: BlockManagerId): Boolean = {
+(isMergedShuffleBlockAddress(address) && address.host != 
blockManager.blockManagerId.host) ||
+  (!isMergedShuffleBlockAddress(address) && address != 
blockManager.blockManagerId)
+  }
+
+  /**
+   * Returns true if the address if of merged local block. false otherwise.
+   */
+  def isMergedLocal(address: BlockManagerId): Boolean = {
+isMergedShuffleBlockAddress(address) && address.host == 
blockManager.blockManagerId.host
+  }
+
+  def getNumberOfBlocksInChunk(blockId : ShuffleBlockChunkId): Int = {
+chunksMetaMap(blockId).getCardinality
+  }
+
+  def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
+chunksMetaMap.remove(blockId)
+  }
+
+  def createChunkBlockInfosFromMetaResponse(
+  shuffleId: Int,
+  reduceId: Int,
+  blockSize: Long,
+  numChunks: Int,
+bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
+val approxChunkSize = blockSize / numChunks
+val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]()
+for (i <- 0 until numChunks) {
+  val blockChunkId = ShuffleBlockChunkId(shuffleId, reduceId, i)
+  chunksMetaMap.put(blockChunkId, bitmaps(i))
+  logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
+  blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
+}
+blocksToFetch
+  }
+
+  def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
+val sizeMap = req.blocks.map {
+  case FetchBlockInfo(blockId, size, _) =>
+val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)}.toMap
+val address = req.address
+val mergedBlocksMetaListener = new MergedBlocksMetaListener {
+  override def onSuccess(shuffleId: Int, reduceId: Int, meta: 
MergedBlockMeta): Unit = {
+logInfo(s"Received the meta of merged block for ($shuffleId, 
$reduceId)  " +
+  s"from ${req.address.host}:${req.address.port}")
+  

[GitHub] [spark] SparkQA commented on pull request #32849: [WIP][SPARK-35704][SQL] Support fields by the day-time interval type

2021-06-10 Thread GitBox


SparkQA commented on pull request #32849:
URL: https://github.com/apache/spark/pull/32849#issuecomment-858940273


   **[Test build #139643 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139643/testReport)**
 for PR 32849 at commit 
[`7e502b5`](https://github.com/apache/spark/commit/7e502b56afcf6bc3238fae037323e0f0ab78edbf).
* This patch **fails SparkR unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #32862: [SPARK-35695][SQL] Collect observed metrics from cached sub-tree

2021-06-10 Thread GitBox


SparkQA commented on pull request #32862:
URL: https://github.com/apache/spark/pull/32862#issuecomment-858936148


   Kubernetes integration test status success
   URL: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44182/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   >