[spark] branch branch-2.4 updated: [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite.
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 710d81e [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite. 710d81e is described below commit 710d81ea87b5ba0c3d49b3dfbc591129685c2a13 Author: Marcelo Vanzin AuthorDate: Thu Jan 31 00:10:23 2019 +0900 [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite. Otherwise the RDD data may be out of date by the time the test tries to check it. Tested with an artificial delay inserted in AppStatusListener. Closes #23654 from vanzin/SPARK-26732. Authored-by: Marcelo Vanzin Signed-off-by: Takeshi Yamamuro (cherry picked from commit 6a2f3dcc2bd601fd1fe7610854bc0f5bf90300f4) Signed-off-by: Takeshi Yamamuro --- core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index 8feb3de..051a13c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -60,6 +60,7 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() assert(sc.getRDDStorageInfo.size === 0) rdd.collect() +sc.listenerBus.waitUntilEmpty(1) assert(sc.getRDDStorageInfo.size === 1) assert(sc.getRDDStorageInfo.head.isCached) assert(sc.getRDDStorageInfo.head.memSize > 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite.
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6a2f3dc [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite. 6a2f3dc is described below commit 6a2f3dcc2bd601fd1fe7610854bc0f5bf90300f4 Author: Marcelo Vanzin AuthorDate: Thu Jan 31 00:10:23 2019 +0900 [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite. Otherwise the RDD data may be out of date by the time the test tries to check it. Tested with an artificial delay inserted in AppStatusListener. Closes #23654 from vanzin/SPARK-26732. Authored-by: Marcelo Vanzin Signed-off-by: Takeshi Yamamuro --- core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index 8feb3de..051a13c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -60,6 +60,7 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() assert(sc.getRDDStorageInfo.size === 0) rdd.collect() +sc.listenerBus.waitUntilEmpty(1) assert(sc.getRDDStorageInfo.size === 1) assert(sc.getRDDStorageInfo.head.isCached) assert(sc.getRDDStorageInfo.head.memSize > 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.3 updated: [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite.
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new f6391e1 [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite. f6391e1 is described below commit f6391e165b8e4dde8a9636fc220042b38eabd056 Author: Marcelo Vanzin AuthorDate: Thu Jan 31 00:10:23 2019 +0900 [SPARK-26732][CORE][TEST] Wait for listener bus to process events in SparkContextInfoSuite. Otherwise the RDD data may be out of date by the time the test tries to check it. Tested with an artificial delay inserted in AppStatusListener. Closes #23654 from vanzin/SPARK-26732. Authored-by: Marcelo Vanzin Signed-off-by: Takeshi Yamamuro (cherry picked from commit 6a2f3dcc2bd601fd1fe7610854bc0f5bf90300f4) Signed-off-by: Takeshi Yamamuro --- core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index 8feb3de..051a13c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -60,6 +60,7 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() assert(sc.getRDDStorageInfo.size === 0) rdd.collect() +sc.listenerBus.waitUntilEmpty(1) assert(sc.getRDDStorageInfo.size === 1) assert(sc.getRDDStorageInfo.head.isCached) assert(sc.getRDDStorageInfo.head.memSize > 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-25689][CORE] Follow up: don't get delegation tokens when kerberos not available.
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a8da410 [SPARK-25689][CORE] Follow up: don't get delegation tokens when kerberos not available. a8da410 is described below commit a8da41061fb38529e90c41571b38294ab5ddc441 Author: Marcelo Vanzin AuthorDate: Wed Jan 30 09:52:50 2019 -0800 [SPARK-25689][CORE] Follow up: don't get delegation tokens when kerberos not available. This avoids trying to get delegation tokens when a TGT is not available, e.g. when running in yarn-cluster mode without a keytab. That would result in an error since that is not allowed. Tested with some (internal) integration tests that started failing with the patch for SPARK-25689. Closes #23689 from vanzin/SPARK-25689.followup. Authored-by: Marcelo Vanzin Signed-off-by: Marcelo Vanzin --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 6bc0bdd..0111b83 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -401,12 +401,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (UserGroupInformation.isSecurityEnabled()) { delegationTokenManager = createTokenManager() delegationTokenManager.foreach { dtm => +val ugi = UserGroupInformation.getCurrentUser() val tokens = if (dtm.renewalEnabled) { dtm.start() -} else { - val creds = UserGroupInformation.getCurrentUser().getCredentials() +} else if (ugi.hasKerberosCredentials()) { + val creds = ugi.getCredentials() dtm.obtainDelegationTokens(creds) SparkHadoopUtil.get.serialize(creds) +} else { + null } if (tokens != null) { updateDelegationTokens(tokens) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26753][CORE] Fixed custom log levels for spark-shell by using Filter instead of Threshold
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 25b97a4 [SPARK-26753][CORE] Fixed custom log levels for spark-shell by using Filter instead of Threshold 25b97a4 is described below commit 25b97a41ceede7b99a5f835077de94b38743203a Author: ankurgupta AuthorDate: Wed Jan 30 10:54:09 2019 -0800 [SPARK-26753][CORE] Fixed custom log levels for spark-shell by using Filter instead of Threshold This fix replaces the Threshold with a Filter for ConsoleAppender which checks to ensure that either the logLevel is greater than thresholdLevel (shell log level) or the log originated from a custom defined logger. In these cases, it lets a log event go through, otherwise it doesn't. 1. Ensured that custom log level works when set by default (via log4j.properties) 2. Ensured that logs are not printed twice when log level is changed by setLogLevel 3. Ensured that custom logs are printed when log level is changed back by setLogLevel Closes #23675 from ankuriitg/ankurgupta/SPARK-26753. Authored-by: ankurgupta Signed-off-by: Marcelo Vanzin --- .../scala/org/apache/spark/internal/Logging.scala | 49 -- .../main/scala/org/apache/spark/util/Utils.scala | 6 +-- .../org/apache/spark/internal/LoggingSuite.scala | 59 ++ 3 files changed, 94 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 00db9af..0987917 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -17,11 +17,10 @@ package org.apache.spark.internal -import java.util.concurrent.ConcurrentHashMap - import scala.collection.JavaConverters._ import org.apache.log4j._ +import org.apache.log4j.spi.{Filter, LoggingEvent} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder @@ -154,16 +153,10 @@ trait Logging { System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " + "For SparkR, use setLogLevel(newLevel).") } + Logging.sparkShellThresholdLevel = replLevel rootLogger.getAllAppenders().asScala.foreach { case ca: ConsoleAppender => - Option(ca.getThreshold()) match { -case Some(t) => - Logging.consoleAppenderToThreshold.put(ca, t) - if (!t.isGreaterOrEqual(replLevel)) { -ca.setThreshold(replLevel) - } -case None => ca.setThreshold(replLevel) - } + ca.addFilter(new SparkShellLoggingFilter()) case _ => // no-op } } @@ -182,7 +175,7 @@ private[spark] object Logging { @volatile private var initialized = false @volatile private var defaultRootLevel: Level = null @volatile private var defaultSparkLog4jConfig = false - private val consoleAppenderToThreshold = new ConcurrentHashMap[ConsoleAppender, Priority]() + @volatile private[spark] var sparkShellThresholdLevel: Level = null val initLock = new Object() try { @@ -211,11 +204,7 @@ private[spark] object Logging { } else { val rootLogger = LogManager.getRootLogger() rootLogger.setLevel(defaultRootLevel) -rootLogger.getAllAppenders().asScala.foreach { - case ca: ConsoleAppender => -ca.setThreshold(consoleAppenderToThreshold.get(ca)) - case _ => // no-op -} +sparkShellThresholdLevel = null } } this.initialized = false @@ -229,3 +218,31 @@ private[spark] object Logging { "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass) } } + +private class SparkShellLoggingFilter extends Filter { + + /** + * If sparkShellThresholdLevel is not defined, this filter is a no-op. + * If log level of event is not equal to root level, the event is allowed. Otherwise, + * the decision is made based on whether the log came from root or some custom configuration + * @param loggingEvent + * @return decision for accept/deny log event + */ + def decide(loggingEvent: LoggingEvent): Int = { +if (Logging.sparkShellThresholdLevel == null) { + return Filter.NEUTRAL +} +val rootLevel = LogManager.getRootLogger().getLevel() +if (!loggingEvent.getLevel().eq(rootLevel)) { + return Filter.NEUTRAL +} +var logger = loggingEvent.getLogger() +while (logger.getParent() != null) { + if (logger.getLevel() != null) { +return Filter.NEUTRAL + } + logger = logger.getParent() +} +return Filter.DENY + } +} diff --git a/core/src/main/scala/org/apache/spark/
[spark] branch branch-2.3 updated: [SPARK-26718][SS][BRANCH-2.3] Fixed integer overflow in SS kafka rateLimit calculation
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new ad18faa [SPARK-26718][SS][BRANCH-2.3] Fixed integer overflow in SS kafka rateLimit calculation ad18faa is described below commit ad18faa65251aa35437adee49a1767d9b4977d48 Author: ryne.yang AuthorDate: Wed Jan 30 11:13:19 2019 -0800 [SPARK-26718][SS][BRANCH-2.3] Fixed integer overflow in SS kafka rateLimit calculation ## What changes were proposed in this pull request? Fix the integer overflow issue in rateLimit. ## How was this patch tested? Pass the Jenkins with newly added UT for the possible case where integer could be overflowed. Closes #23703 from linehrr/fix/2.3-integeroverflow. Authored-by: ryne.yang Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/kafka010/KafkaSource.scala| 10 ++- .../spark/sql/kafka010/KafkaSourceSuite.scala | 35 ++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 169a5d0..f88c4d5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -188,7 +188,15 @@ private[kafka010] class KafkaSource( val prorate = limit * (size / total) logDebug(s"rateLimit $tp prorated amount is $prorate") // Don't completely starve small topicpartitions -val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +// need to be careful of integer overflow +// therefore added canary checks where to see if off variable could be overflowed +// refer to [https://issues.apache.org/jira/browse/SPARK-26718] +val off = if (prorateLong > Long.MaxValue - begin) { + Long.MaxValue +} else { + begin + prorateLong +} logDebug(s"rateLimit $tp new offset is $off") // Paranoia, make sure not to return an offset that's past end Math.min(end, off) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 02c8764..3a27d41 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -176,6 +176,41 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase { StopStream) } + test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " + +"during end offset calculation") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +// fill in 5 messages to trigger potential integer overflow +testUtils.sendMessages(topic, (0 until 5).map(_.toString).toArray, Some(0)) + +val partitionOffsets = Map( + new TopicPartition(topic, 0) -> 5L +) +val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets) + +val kafka = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + // use latest to force begin to be 5 + .option("startingOffsets", startingOffsets) + // use Long.Max to try to trigger overflow + .option("maxOffsetsPerTrigger", Long.MaxValue) + .option("subscribe", topic) + .option("kafka.metadata.max.age.ms", "1") + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 30, 31, 32, 33, 34), + CheckAnswer(30, 31, 32, 33, 34), + StopStream +) + } + test("maxOffsetsPerTrigger") { val topic = newTopic() testUtils.createTopic(topic, partitions = 3) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26311][CORE] New feature: apply custom log URL pattern for executor log URLs in SHS
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ae5b2a6 [SPARK-26311][CORE] New feature: apply custom log URL pattern for executor log URLs in SHS ae5b2a6 is described below commit ae5b2a6a92be4986ef5b8062d7fb59318cff6430 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Wed Jan 30 11:52:30 2019 -0800 [SPARK-26311][CORE] New feature: apply custom log URL pattern for executor log URLs in SHS ## What changes were proposed in this pull request? This patch proposes adding a new configuration on SHS: custom executor log URL pattern. This will enable end users to replace executor logs to other than RM provide, like external log service, which enables to serve executor logs when NodeManager becomes unavailable in case of YARN. End users can build their own of custom executor log URLs with pre-defined patterns which would be vary on each resource manager. This patch adds some patterns to YARN resource manager. (For others, there's even no executor log url available so cannot define patterns as well.) Please refer the doc change as well as added UTs in this patch to see how to set up the feature. ## How was this patch tested? Added UT, as well as manual test with YARN cluster Closes #23260 from HeartSaVioR/SPARK-26311. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin --- .../main/scala/org/apache/spark/SparkContext.scala | 3 +- .../spark/deploy/history/FsHistoryProvider.scala | 7 +- .../deploy/history/HistoryAppStatusStore.scala | 133 + .../executor/CoarseGrainedExecutorBackend.scala| 9 +- .../org/apache/spark/internal/config/History.scala | 18 ++ .../apache/spark/scheduler/SchedulerBackend.scala | 7 + .../org/apache/spark/scheduler/SparkListener.scala | 3 +- .../cluster/CoarseGrainedClusterMessage.scala | 3 +- .../cluster/CoarseGrainedSchedulerBackend.scala| 4 +- .../spark/scheduler/cluster/ExecutorData.scala | 5 +- .../spark/scheduler/cluster/ExecutorInfo.scala | 12 +- .../scheduler/local/LocalSchedulerBackend.scala| 3 +- .../apache/spark/status/AppStatusListener.scala| 2 + .../scala/org/apache/spark/status/LiveEntity.scala | 4 +- .../scala/org/apache/spark/status/api/v1/api.scala | 3 +- .../scala/org/apache/spark/util/JsonProtocol.scala | 16 +- .../executor_list_json_expectation.json| 3 +- ...ist_with_executor_metrics_json_expectation.json | 24 ++- ...utor_process_tree_metrics_json_expectation.json | 6 +- .../executor_memory_usage_expectation.json | 15 +- .../executor_node_blacklisting_expectation.json| 15 +- ...de_blacklisting_unblacklisting_expectation.json | 15 +- .../spark/ExecutorAllocationManagerSuite.scala | 16 +- .../org/apache/spark/HeartbeatReceiverSuite.scala | 4 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 4 +- .../deploy/history/FsHistoryProviderSuite.scala| 209 - .../scheduler/EventLoggingListenerSuite.scala | 3 +- .../spark/status/AppStatusListenerSuite.scala | 7 +- .../org/apache/spark/util/JsonProtocolSuite.scala | 10 +- docs/monitoring.md | 24 +++ docs/running-on-yarn.md| 32 +++- project/MimaExcludes.scala | 6 + .../mesos/MesosFineGrainedSchedulerBackend.scala | 2 +- .../MesosCoarseGrainedSchedulerBackendSuite.scala | 3 +- .../MesosFineGrainedSchedulerBackendSuite.scala| 15 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 31 ++- .../cluster/YarnClusterSchedulerBackend.scala | 36 +--- .../spark/util/YarnContainerInfoHelper.scala | 103 ++ .../spark/deploy/yarn/YarnClusterSuite.scala | 28 ++- 39 files changed, 717 insertions(+), 126 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f2b79b8..6273601 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2355,7 +2355,8 @@ class SparkContext(config: SparkConf) extends Logging { // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) + startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls, + schedulerBackend.getDriverAttributes)) _driverLogger.foreach(_.startSync(_hadoopConfiguration))
[GitHub] srowen closed pull request #172: Update documentation.md
srowen closed pull request #172: Update documentation.md URL: https://github.com/apache/spark-website/pull/172 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] srowen commented on issue #172: Update documentation.md
srowen commented on issue #172: Update documentation.md URL: https://github.com/apache/spark-website/pull/172#issuecomment-459161006 Reopen if you update it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade uncache in the uncache method
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d8d2736 [SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade uncache in the uncache method d8d2736 is described below commit d8d2736fd1e3cd47941153327ad50a4d36099476 Author: Wenchen Fan AuthorDate: Thu Jan 31 11:04:33 2019 +0800 [SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade uncache in the uncache method ## What changes were proposed in this pull request? This is a follow up of https://github.com/apache/spark/pull/23644/files , to make these methods less coupled with each other. ## How was this patch tested? existing tests Closes #23687 from cloud-fan/cache. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../apache/spark/sql/execution/CacheManager.scala | 48 +++--- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 00c4461..398d7b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -160,7 +160,22 @@ class CacheManager extends Logging { } // Re-compile dependent cached queries after removing the cached query. if (!cascade) { - recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false) + recacheByCondition(spark, cd => { +// If the cache buffer has already been loaded, we don't need to recompile the cached plan, +// as it does not rely on the plan that has been uncached anymore, it will just produce +// data from the cache buffer. +// Note that the `CachedRDDBuilder.isCachedColumnBuffersLoaded` call is a non-locking +// status test and may not return the most accurate cache buffer state. So the worse case +// scenario can be: +// 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we +//will clear the buffer and re-compiled the plan. It is inefficient but doesn't affect +//correctness. +// 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we +//will keep it as it is. It means the physical plan has been re-compiled already in the +//other thread. +val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded +cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded + }) } } @@ -168,38 +183,21 @@ class CacheManager extends Logging { * Tries to re-cache all the cache entries that refer to the given plan. */ def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = { -recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined) +recacheByCondition(spark, _.plan.find(_.sameResult(plan)).isDefined) } + /** + * Re-caches all the cache entries that satisfies the given `condition`. + */ private def recacheByCondition( spark: SparkSession, - condition: LogicalPlan => Boolean, - clearCache: Boolean = true): Unit = { + condition: CachedData => Boolean): Unit = { val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] writeLock { val it = cachedData.iterator() while (it.hasNext) { val cd = it.next() -// If `clearCache` is false (which means the recache request comes from a non-cascading -// cache invalidation) and the cache buffer has already been loaded, we do not need to -// re-compile a physical plan because the old plan will not be used any more by the -// CacheManager although it still lives in compiled `Dataset`s and it could still work. -// Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer -// and re-compile the physical plan; or it is a non-cascading cache invalidation and cache -// buffer is still empty, then we could have a more efficient new plan by removing -// dependency on the previously removed cache entries. -// Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking -// status test and may not return the most accurate cache buffer state. So the worse case -// scenario can be: -// 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we -//will clear the buffer and build a new plan. It is inefficient but doesn't affect -//correctness. -// 2) The buffer has been cleared, but `isCachedColumnB
[spark] branch master updated: [SPARK-24360][SQL] Support Hive 3.1 metastore
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new aeff69b [SPARK-24360][SQL] Support Hive 3.1 metastore aeff69b is described below commit aeff69bd879661367367f39b5dfecd9a76223c0b Author: Dongjoon Hyun AuthorDate: Wed Jan 30 20:33:21 2019 -0800 [SPARK-24360][SQL] Support Hive 3.1 metastore ## What changes were proposed in this pull request? Hive 3.1.1 is released. This PR aims to support Hive 3.1.x metastore. Please note that Hive 3.0.0 Metastore is skipped intentionally. ## How was this patch tested? Pass the Jenkins with the updated test cases including 3.1. Closes #23694 from dongjoon-hyun/SPARK-24360-3.1. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- docs/sql-data-sources-hive-tables.md | 2 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 3 +- .../spark/sql/hive/client/HiveClientImpl.scala | 17 ++- .../apache/spark/sql/hive/client/HiveShim.scala| 126 + .../sql/hive/client/IsolatedClientLoader.scala | 1 + .../org/apache/spark/sql/hive/client/package.scala | 9 +- .../spark/sql/hive/execution/SaveAsHiveFile.scala | 2 +- .../spark/sql/hive/client/HiveClientSuite.scala| 23 +++- .../spark/sql/hive/client/HiveClientVersions.scala | 2 +- .../spark/sql/hive/client/HiveVersionSuite.scala | 7 +- .../spark/sql/hive/client/VersionsSuite.scala | 33 +- 11 files changed, 206 insertions(+), 19 deletions(-) diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index 3b39a32..14773ca 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -115,7 +115,7 @@ The following options can be used to configure the version of Hive that is used 1.2.1 Version of the Hive metastore. Available - options are 0.12.0 through 2.3.4. + options are 0.12.0 through 2.3.4 and 3.1.0 through 3.1.1. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 597eef1..38bbe64 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -62,7 +62,8 @@ private[spark] object HiveUtils extends Logging { val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + -s"0.12.0 through 2.3.4.") +"0.12.0 through 2.3.4 and " + +"3.1.0 through 3.1.1.") .stringConf .createWithDefault(builtinHiveVersion) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5e9b324..bfe19c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -105,6 +105,7 @@ private[hive] class HiveClientImpl( case hive.v2_1 => new Shim_v2_1() case hive.v2_2 => new Shim_v2_2() case hive.v2_3 => new Shim_v2_3() +case hive.v3_1 => new Shim_v3_1() } // Create an internal session state for this HiveClientImpl. @@ -852,11 +853,17 @@ private[hive] class HiveClientImpl( client.getAllTables("default").asScala.foreach { t => logDebug(s"Deleting table $t") val table = client.getTable("default", t) - client.getIndexes("default", t, 255).asScala.foreach { index => -shim.dropIndex(client, "default", t, index.getIndexName) - } - if (!table.isIndexTable) { -client.dropTable("default", t) + try { +client.getIndexes("default", t, 255).asScala.foreach { index => + shim.dropIndex(client, "default", t, index.getIndexName) +} +if (!table.isIndexTable) { + client.dropTable("default", t) +} + } catch { +case _: NoSuchMethodError => + // HIVE-18448 Hive 3.0 remove index APIs + client.dropTable("default", t) } } client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 4d48490..a8ebb23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.a
[spark] branch master updated: [SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d4d6df2 [SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959 d4d6df2 is described below commit d4d6df2f7d97168f0f3073aa42608294030ece55 Author: Hyukjin Kwon AuthorDate: Thu Jan 31 14:32:31 2019 +0800 [SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959 ## What changes were proposed in this pull request? This PR reverts JSON count optimization part of #21909. We cannot distinguish the cases below without parsing: ``` [{...}, {...}] ``` ``` [] ``` ``` {...} ``` ```bash # empty string ``` when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input. See also https://github.com/apache/spark/pull/23665#discussion_r251276720. ## How was this patch tested? Manually tested. Closes #23667 from HyukjinKwon/revert-SPARK-24959. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala | 16 +++- .../spark/sql/catalyst/expressions/csvExpressions.scala | 3 +-- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 3 +-- .../spark/sql/catalyst/util/FailureSafeParser.scala | 11 ++- sql/core/benchmarks/JSONBenchmark-results.txt| 1 - .../scala/org/apache/spark/sql/DataFrameReader.scala | 6 ++ .../sql/execution/datasources/json/JsonDataSource.scala | 6 ++ .../sql/execution/datasources/json/JsonBenchmark.scala | 3 --- 8 files changed, 19 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 82a5b3c..79dff6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -188,11 +188,19 @@ class UnivocityParser( } } + private val doParse = if (requiredSchema.nonEmpty) { +(input: String) => convert(tokenizer.parseLine(input)) + } else { +// If `columnPruning` enabled and partition attributes scanned only, +// `schema` gets empty. +(_: String) => InternalRow.empty + } + /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = doParse(input) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) @@ -282,8 +290,7 @@ private[sql] object UnivocityParser { input => Seq(parser.convert(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) val handleHeader: () => Unit = () => headerChecker.checkHeaderColumnNames(tokenizer) @@ -336,8 +343,7 @@ private[sql] object UnivocityParser { input => Seq(parser.parse(input)), parser.options.parseMode, schema, - parser.options.columnNameOfCorruptRecord, - parser.options.multiLine) + parser.options.columnNameOfCorruptRecord) filteredLines.flatMap(safeParser.parse) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 83b0299..65b10f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -117,8 +117,7 @@ case class CsvToStructs( input => Seq(rawParser.parse(input)), mode, nullableSchema, - parsedOptions.columnNameOfCorruptRecord, - parsedOptions.multiLine) + parsedOptions.columnNameOfCorruptRecord) } override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 3403349..655e44e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -582,8 +582,7 @@ case class JsonToStructs( input =
[spark] branch master updated: [SPARK-26716][SPARK-26765][FOLLOWUP][SQL] Clean up schema validation methods and override toString method in Avro
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 308996b [SPARK-26716][SPARK-26765][FOLLOWUP][SQL] Clean up schema validation methods and override toString method in Avro 308996b is described below commit 308996bc72c95582577843d22fcca5f1051d242a Author: Gengliang Wang AuthorDate: Thu Jan 31 15:44:44 2019 +0800 [SPARK-26716][SPARK-26765][FOLLOWUP][SQL] Clean up schema validation methods and override toString method in Avro ## What changes were proposed in this pull request? In #23639, the API `supportDataType` is refactored. We should also remove the method `verifyWriteSchema` and `verifyReadSchema` in `DataSourceUtils`. Since the error message use `FileFormat.toString` to specify the data source naming, this PR also overriding the `toString` method in `AvroFileFormat`. ## How was this patch tested? Unit test. Closes #23699 from gengliangwang/SPARK-26716-followup. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/avro/AvroFileFormat.scala | 2 ++ .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 2 +- .../spark/sql/execution/datasources/DataSource.scala| 2 +- .../sql/execution/datasources/DataSourceUtils.scala | 17 + .../sql/execution/datasources/FileFormatWriter.scala| 2 +- 5 files changed, 6 insertions(+), 19 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 7391665..c2a7f31 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -124,6 +124,8 @@ private[avro] class AvroFileFormat extends FileFormat override def shortName(): String = "avro" + override def toString(): String = "Avro" + override def isSplitable( sparkSession: SparkSession, options: Map[String, String], diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index d803537..81a5cb7 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -896,7 +896,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { sql("select testType()").write.format("avro").mode("overwrite").save(tempDir) }.getMessage assert(msg.toLowerCase(Locale.ROOT) -.contains(s"data source does not support calendarinterval data type.")) +.contains(s"avro data source does not support calendarinterval data type.")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d48261e..db81fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -412,7 +412,7 @@ case class DataSource( hs.partitionSchema.map(_.name), "in the partition schema", equality) -DataSourceUtils.verifyReadSchema(hs.fileFormat, hs.dataSchema) +DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema) case _ => SchemaUtils.checkColumnNameDuplication( relation.schema.map(_.name), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index a32a940..74eae94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -24,26 +24,11 @@ import org.apache.spark.sql.types._ object DataSourceUtils { - - /** - * Verify if the schema is supported in datasource in write path. - */ - def verifyWriteSchema(format: FileFormat, schema: StructType): Unit = { -verifySchema(format, schema, isReadPath = false) - } - - /** - * Verify if the schema is supported in datasource in read path. - */ - def verifyReadSchema(format: FileFormat, schema: StructType): Unit = { -verifySchema(format, schema, isReadPath = true) - } - /** * Verify if the schema is supported in datasource. This verification should be done * in a driver side. */ - private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = { + def verifySchema(fo
[spark] branch master updated: [MINOR][DOCS] Add a note that 'spark.executor.pyspark.memory' is dependent on 'resource'
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0d77d57 [MINOR][DOCS] Add a note that 'spark.executor.pyspark.memory' is dependent on 'resource' 0d77d57 is described below commit 0d77d575e14e535fbe29b42e5612f3ddc64d42f4 Author: Hyukjin Kwon AuthorDate: Thu Jan 31 15:51:40 2019 +0800 [MINOR][DOCS] Add a note that 'spark.executor.pyspark.memory' is dependent on 'resource' ## What changes were proposed in this pull request? This PR adds a note that explicitly `spark.executor.pyspark.memory` is dependent on resource module's behaviours at Python memory usage. For instance, I at least see some difference at https://github.com/apache/spark/pull/21977#discussion_r251220966 ## How was this patch tested? Manually built the doc. Closes #23664 from HyukjinKwon/note-resource-dependent. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- docs/configuration.md | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 806e16a..5b5891e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -190,8 +190,10 @@ of the most common options to set are: and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. - -NOTE: Python memory usage may not be limited on platforms that do not support resource limiting, such as Windows. + +Note: This feature is dependent on Python's `resource` module; therefore, the behaviors and +limitations are inherited. For instance, Windows does not support resource limiting and actual +resource is not limited on MacOS. @@ -223,7 +225,8 @@ of the most common options to set are: stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. -NOTE: In Spark 1.0 and later this will be overridden by SPARK_LOCAL_DIRS (Standalone), MESOS_SANDBOX (Mesos) or + +Note: This will be overridden by SPARK_LOCAL_DIRS (Standalone), MESOS_SANDBOX (Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org