git commit: [SPARK-4107] Fix incorrect handling of read() and skip() return values
Repository: spark Updated Branches: refs/heads/master 4ceb048b3 - 46c63417c [SPARK-4107] Fix incorrect handling of read() and skip() return values `read()` may return fewer bytes than requested; when this occurred, the old code would silently return less data than requested, which might cause stream corruption errors. `skip()` faces similar issues, too. This patch fixes several cases where we mis-handle these methods' return values. Author: Josh Rosen joshro...@databricks.com Closes #2969 from JoshRosen/file-channel-read-fix and squashes the following commits: e724a9f [Josh Rosen] Fix similar issue of not checking skip() return value. cbc03ce [Josh Rosen] Update the other log message, too. 01e6015 [Josh Rosen] file.getName - file.getAbsolutePath d961d95 [Josh Rosen] Fix another issue in FileServerSuite. b9265d2 [Josh Rosen] Fix a similar (minor) issue in TestUtils. cd9d76f [Josh Rosen] Fix a similar error in Tachyon: 3db0008 [Josh Rosen] Fix a similar read() error in Utils.offsetBytes(). db985ed [Josh Rosen] Fix unsafe usage of FileChannel.read(): Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46c63417 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46c63417 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46c63417 Branch: refs/heads/master Commit: 46c63417c1bb1aea07baf9036cc5b8f1c3781bbe Parents: 4ceb048 Author: Josh Rosen joshro...@databricks.com Authored: Tue Oct 28 00:04:16 2014 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Oct 28 00:04:16 2014 -0700 -- .../main/scala/org/apache/spark/TestUtils.scala | 9 ++--- .../apache/spark/network/ManagedBuffer.scala| 10 -- .../shuffle/IndexShuffleBlockManager.scala | 4 +++- .../org/apache/spark/storage/DiskStore.scala| 10 -- .../org/apache/spark/storage/TachyonStore.scala | 21 +++- .../scala/org/apache/spark/util/Utils.scala | 6 +++--- .../org/apache/spark/FileServerSuite.scala | 8 ++-- 7 files changed, 33 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/TestUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index e72826d..3407814 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Files} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} -import com.google.common.io.Files import org.apache.spark.util.Utils @@ -64,12 +64,7 @@ private[spark] object TestUtils { jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead = 0) { -nRead = in.read(buffer, 0, buffer.length) -jarStream.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jarStream) in.close() } jarStream.close() http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index 4c9ca97..4211ba4 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -81,7 +81,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead. if (length MIN_MEMORY_MAP_BYTES) { val buf = ByteBuffer.allocate(length.toInt) -channel.read(buf, offset) +channel.position(offset) +while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { +throw new IOException(Reached EOF before filling buffer\n + + soffset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}) + } +} buf.flip() buf } else { @@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt var is: FileInputStream = null try { is = new FileInputStream(file) - is.skip(offset) + ByteStreams.skipFully(is, offset) ByteStreams.limit(is, length) } catch { case e: IOException =
git commit: [SPARK-4116][YARN]Delete the abandoned log4j-spark-container.properties
Repository: spark Updated Branches: refs/heads/master fae095bc7 - 47346cd02 [SPARK-4116][YARN]Delete the abandoned log4j-spark-container.properties Since its name reduced at https://github.com/apache/spark/pull/560, the log4j-spark-container.properties was never used again. And I have searched its name globally in code and found no cite. Author: WangTaoTheTonic barneystin...@aliyun.com Closes #2977 from WangTaoTheTonic/delLog4j and squashes the following commits: fb2729f [WangTaoTheTonic] delete the log4j file obsoleted Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47346cd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47346cd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47346cd0 Branch: refs/heads/master Commit: 47346cd029abc50c70582a721810a7cceb682d8a Parents: fae095b Author: WangTaoTheTonic barneystin...@aliyun.com Authored: Tue Oct 28 08:46:31 2014 -0500 Committer: Thomas Graves tgra...@apache.org Committed: Tue Oct 28 08:46:31 2014 -0500 -- .../resources/log4j-spark-container.properties | 24 1 file changed, 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47346cd0/yarn/common/src/main/resources/log4j-spark-container.properties -- diff --git a/yarn/common/src/main/resources/log4j-spark-container.properties b/yarn/common/src/main/resources/log4j-spark-container.properties deleted file mode 100644 index a1e37a0..000 --- a/yarn/common/src/main/resources/log4j-spark-container.properties +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the License); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an AS IS BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. See accompanying LICENSE file. - -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - -# Settings to quiet third party logs that are too verbose -log4j.logger.org.eclipse.jetty=WARN -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4098][YARN]use appUIAddress instead of appUIHostPort in yarn-client mode
Repository: spark Updated Branches: refs/heads/master e8813be65 - 0ac52e305 [SPARK-4098][YARN]use appUIAddress instead of appUIHostPort in yarn-client mode https://issues.apache.org/jira/browse/SPARK-4098 Author: WangTaoTheTonic barneystin...@aliyun.com Closes #2958 from WangTaoTheTonic/useAddress and squashes the following commits: 29236e6 [WangTaoTheTonic] use appUIAddress instead of appUIHostPort in yarn-cluster mode Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ac52e30 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ac52e30 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ac52e30 Branch: refs/heads/master Commit: 0ac52e30552530b247e37a470b8503346f19605c Parents: e8813be Author: WangTaoTheTonic barneystin...@aliyun.com Authored: Tue Oct 28 09:51:44 2014 -0500 Committer: Thomas Graves tgra...@apache.org Committed: Tue Oct 28 09:51:44 2014 -0500 -- .../spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ac52e30/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala -- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d948a2a..59b2b47 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -48,7 +48,7 @@ private[spark] class YarnClientSchedulerBackend( val driverHost = conf.get(spark.driver.host) val driverPort = conf.get(spark.driver.port) val hostport = driverHost + : + driverPort -sc.ui.foreach { ui = conf.set(spark.driver.appUIAddress, ui.appUIHostPort) } +sc.ui.foreach { ui = conf.set(spark.driver.appUIAddress, ui.appUIAddress) } val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += (--arg, hostport) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4110] Wrong comments about default settings in spark-daemon.sh
Repository: spark Updated Branches: refs/heads/master 7768a800d - 44d8b45a3 [SPARK-4110] Wrong comments about default settings in spark-daemon.sh In spark-daemon.sh, thare are following comments. # SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_PREFIX}/conf. # SPARK_LOG_DIR Where log files are stored. PWD by default. But, I think the default value for SPARK_CONF_DIR is `${SPARK_HOME}/conf` and for SPARK_LOG_DIR is `${SPARK_HOME}/logs`. Author: Kousuke Saruta saru...@oss.nttdata.co.jp Closes #2972 from sarutak/SPARK-4110 and squashes the following commits: 5a171a2 [Kousuke Saruta] Fixed wrong comments Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44d8b45a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44d8b45a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44d8b45a Branch: refs/heads/master Commit: 44d8b45a38c8d934628373a3b21084432516ee00 Parents: 7768a80 Author: Kousuke Saruta saru...@oss.nttdata.co.jp Authored: Tue Oct 28 12:29:01 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Tue Oct 28 12:29:01 2014 -0700 -- sbin/spark-daemon.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/44d8b45a/sbin/spark-daemon.sh -- diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index cba475e..89608bc 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -21,8 +21,8 @@ # # Environment Variables # -# SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_PREFIX}/conf. -# SPARK_LOG_DIR Where log files are stored. PWD by default. +# SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_HOME}/conf. +# SPARK_LOG_DIR Where log files are stored. ${SPARK_HOME}/logs by default. # SPARK_MASTERhost:path where spark code should be rsync'd from # SPARK_PID_DIR The pid files are stored. /tmp by default. # SPARK_IDENT_STRING A string representing this instance of spark. $USER by default - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4110] Wrong comments about default settings in spark-daemon.sh
Repository: spark Updated Branches: refs/heads/branch-1.1 2ef2f5a7c - dee331738 [SPARK-4110] Wrong comments about default settings in spark-daemon.sh In spark-daemon.sh, thare are following comments. # SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_PREFIX}/conf. # SPARK_LOG_DIR Where log files are stored. PWD by default. But, I think the default value for SPARK_CONF_DIR is `${SPARK_HOME}/conf` and for SPARK_LOG_DIR is `${SPARK_HOME}/logs`. Author: Kousuke Saruta saru...@oss.nttdata.co.jp Closes #2972 from sarutak/SPARK-4110 and squashes the following commits: 5a171a2 [Kousuke Saruta] Fixed wrong comments (cherry picked from commit 44d8b45a38c8d934628373a3b21084432516ee00) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dee33173 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dee33173 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dee33173 Branch: refs/heads/branch-1.1 Commit: dee33173865c40e1270af581ee5f27b4931dc6d0 Parents: 2ef2f5a Author: Kousuke Saruta saru...@oss.nttdata.co.jp Authored: Tue Oct 28 12:29:01 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Tue Oct 28 12:29:12 2014 -0700 -- sbin/spark-daemon.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dee33173/sbin/spark-daemon.sh -- diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index 9032f23..1b93165 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -21,8 +21,8 @@ # # Environment Variables # -# SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_PREFIX}/conf. -# SPARK_LOG_DIR Where log files are stored. PWD by default. +# SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_HOME}/conf. +# SPARK_LOG_DIR Where log files are stored. ${SPARK_HOME}/logs by default. # SPARK_MASTERhost:path where spark code should be rsync'd from # SPARK_PID_DIR The pid files are stored. /tmp by default. # SPARK_IDENT_STRING A string representing this instance of spark. $USER by default - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4107] Fix incorrect handling of read() and skip() return values (branch-1.1 backport)
Repository: spark Updated Branches: refs/heads/branch-1.1 dee331738 - 286f1efb0 [SPARK-4107] Fix incorrect handling of read() and skip() return values (branch-1.1 backport) `read()` may return fewer bytes than requested; when this occurred, the old code would silently return less data than requested, which might cause stream corruption errors. `skip()` faces similar issues, too. This patch fixes several cases where we mis-handle these methods' return values. This is a backport of #2969 to `branch-1.1`. Author: Josh Rosen joshro...@databricks.com Closes #2974 from JoshRosen/spark-4107-branch-1.1-backport and squashes the following commits: d82c05b [Josh Rosen] [SPARK-4107] Fix incorrect handling of read() and skip() return values Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/286f1efb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/286f1efb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/286f1efb Branch: refs/heads/branch-1.1 Commit: 286f1efb0554f055de5dfc0b317b1dff120ce5a0 Parents: dee3317 Author: Josh Rosen joshro...@databricks.com Authored: Tue Oct 28 12:30:12 2014 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Oct 28 12:30:12 2014 -0700 -- .../main/scala/org/apache/spark/TestUtils.scala | 9 ++--- .../org/apache/spark/storage/DiskStore.scala| 10 -- .../org/apache/spark/storage/TachyonStore.scala | 21 +++- .../scala/org/apache/spark/util/Utils.scala | 6 +++--- .../org/apache/spark/FileServerSuite.scala | 8 ++-- 5 files changed, 22 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/TestUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 8ca7310..c5e7a73 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConversions._ +import com.google.common.io.{ByteStreams, Files} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} -import com.google.common.io.Files /** * Utilities for tests. Included in main codebase since it's used by multiple @@ -63,12 +63,7 @@ private[spark] object TestUtils { jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) - val buffer = new Array[Byte](10240) - var nRead = 0 - while (nRead = 0) { -nRead = in.read(buffer, 0, buffer.length) -jarStream.write(buffer, 0, nRead) - } + ByteStreams.copy(in, jarStream) in.close() } jarStream.close() http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/storage/DiskStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 295c706..247e240 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, RandomAccessFile} +import java.io.{IOException, FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode @@ -111,7 +111,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc // For small files, directly read rather than memory map if (segment.length minMemoryMapBytes) { val buf = ByteBuffer.allocate(segment.length.toInt) -channel.read(buf, segment.offset) +channel.position(segment.offset) +while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { +throw new IOException(Reached EOF before filling buffer\n + + soffset=${segment.offset}\nblockId=$blockId\nbuf.remaining=${buf.remaining}) + } +} buf.flip() Some(buf) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala index 932b561..6dbad5f 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala @@ -20,6 +20,7 @@
git commit: [SPARK-4096][YARN]let ApplicationMaster accept executor memory argument in same format as JVM memory strings
Repository: spark Updated Branches: refs/heads/master 44d8b45a3 - 1ea3e3dc9 [SPARK-4096][YARN]let ApplicationMaster accept executor memory argument in same format as JVM memory strings Here `ApplicationMaster` accept executor memory argument only in number format, we should let it accept JVM style memory strings as well. Author: WangTaoTheTonic barneystin...@aliyun.com Closes #2955 from WangTaoTheTonic/modifyDesc and squashes the following commits: ab98c70 [WangTaoTheTonic] append parameter passed in 3779767 [WangTaoTheTonic] Update executor memory description in the help message Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ea3e3dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ea3e3dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ea3e3dc Branch: refs/heads/master Commit: 1ea3e3dc9dd942402731751089bab2fb6ae29c7b Parents: 44d8b45 Author: WangTaoTheTonic barneystin...@aliyun.com Authored: Tue Oct 28 12:31:42 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Tue Oct 28 12:31:42 2014 -0700 -- .../apache/spark/deploy/yarn/ApplicationMasterArguments.scala| 4 ++-- .../src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1ea3e3dc/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala -- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 3e6b96f..5c54e34 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import org.apache.spark.util.IntParam +import org.apache.spark.util.{MemoryParam, IntParam} import collection.mutable.ArrayBuffer class ApplicationMasterArguments(val args: Array[String]) { @@ -55,7 +55,7 @@ class ApplicationMasterArguments(val args: Array[String]) { numExecutors = value args = tail -case (--worker-memory | --executor-memory) :: IntParam(value) :: tail = +case (--worker-memory | --executor-memory) :: MemoryParam(value) :: tail = executorMemory = value args = tail http://git-wip-us.apache.org/repos/asf/spark/blob/1ea3e3dc/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala -- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 0417cdd..8ea0e7c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -379,7 +379,7 @@ private[spark] trait ClientBase extends Logging { val amArgs = Seq(amClass) ++ userClass ++ userJar ++ userArgs ++ Seq( ---executor-memory, args.executorMemory.toString, +--executor-memory, args.executorMemory.toString + m, --executor-cores, args.executorCores.toString, --num-executors , args.numExecutors.toString) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4065] Add check for IPython on Windows
Repository: spark Updated Branches: refs/heads/branch-1.1 286f1efb0 - f0c571760 [SPARK-4065] Add check for IPython on Windows This issue employs logic similar to the bash launcher (pyspark) to check if IPTYHON=1, and if so launch ipython with options in IPYTHON_OPTS. This fix assumes that ipython is available in the system Path, and can be invoked with a plain ipython command. Author: Michael Griffiths msjgriffi...@gmail.com Closes #2910 from msjgriffiths/pyspark-windows and squashes the following commits: ef34678 [Michael Griffiths] Change build message to comply with [SPARK-3775] 361e3d8 [Michael Griffiths] [SPARK-4065] Add check for IPython on Windows 9ce72d1 [Michael Griffiths] [SPARK-4065] Add check for IPython on Windows (cherry picked from commit 2f254dacf4b7ab9c59c7cef59fd364ca682162ae) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0c57176 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0c57176 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0c57176 Branch: refs/heads/branch-1.1 Commit: f0c571760040c998d83ad87d08e104b38bfc19f7 Parents: 286f1ef Author: Michael Griffiths msjgriffi...@gmail.com Authored: Tue Oct 28 12:47:21 2014 -0700 Committer: Andrew Or and...@databricks.com Committed: Tue Oct 28 12:47:33 2014 -0700 -- bin/pyspark2.cmd | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0c57176/bin/pyspark2.cmd -- diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index a0e66ab..59415e9 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -59,7 +59,12 @@ for /f %%i in ('echo %1^| findstr /R \.py') do ( ) if [%PYTHON_FILE%] == [] ( - %PYSPARK_PYTHON% + set PYSPARK_SHELL=1 + if [%IPYTHON%] == [1] ( + ipython %IPYTHON_OPTS% + ) else ( + %PYSPARK_PYTHON% + ) ) else ( echo. echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3814][SQL] Support for Bitwise AND(), OR(|) , XOR(^), NOT(~) in Spark HQL and SQL
Repository: spark Updated Branches: refs/heads/master 6c1b981c3 - 5807cb40a [SPARK-3814][SQL] Support for Bitwise AND(), OR(|) ,XOR(^), NOT(~) in Spark HQL and SQL Currently there is no support of Bitwise , | in Spark HiveQl and Spark SQL as well. So this PR support the same. I am closing https://github.com/apache/spark/pull/2926 as it has conflicts to merge. And also added support for Bitwise AND(), OR(|) ,XOR(^), NOT(~) And I handled all review comments in that PR Author: ravipesala ravindra.pes...@huawei.com Closes #2961 from ravipesala/SPARK-3814-NEW4 and squashes the following commits: a391c7a [ravipesala] Rebase with master Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5807cb40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5807cb40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5807cb40 Branch: refs/heads/master Commit: 5807cb40ae178f0395c71b967f02aee853ef8bc9 Parents: 6c1b981 Author: ravipesala ravindra.pes...@huawei.com Authored: Tue Oct 28 13:36:06 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Oct 28 13:36:06 2014 -0700 -- .../spark/sql/catalyst/SparkSQLParser.scala | 2 +- .../apache/spark/sql/catalyst/SqlParser.scala | 4 + .../apache/spark/sql/catalyst/dsl/package.scala | 4 + .../sql/catalyst/expressions/arithmetic.scala | 89 .../expressions/ExpressionEvaluationSuite.scala | 32 +++ .../org/apache/spark/sql/SQLQuerySuite.scala| 16 .../org/apache/spark/sql/hive/HiveQl.scala | 4 + .../sql/hive/execution/SQLQuerySuite.scala | 24 ++ 8 files changed, 174 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5807cb40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala index 219322c..12e8346 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala @@ -61,7 +61,7 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical { delimiters += ( @, *, +, -, , =, , !=, =, =, , /, (, ), -,, ;, %, {, }, :, [, ], . +,, ;, %, {, }, :, [, ], ., , |, ^, ~ ) override lazy val token: Parser[Token] = http://git-wip-us.apache.org/repos/asf/spark/blob/5807cb40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 4e96771..0acf725 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -260,6 +260,9 @@ class SqlParser extends AbstractSparkSQLParser { ( * ^^^ { (e1: Expression, e2: Expression) = Multiply(e1, e2) } | / ^^^ { (e1: Expression, e2: Expression) = Divide(e1, e2) } | % ^^^ { (e1: Expression, e2: Expression) = Remainder(e1, e2) } + | ^^^ { (e1: Expression, e2: Expression) = BitwiseAnd(e1, e2) } + | | ^^^ { (e1: Expression, e2: Expression) = BitwiseOr(e1, e2) } + | ^ ^^^ { (e1: Expression, e2: Expression) = BitwiseXor(e1, e2) } ) protected lazy val function: Parser[Expression] = @@ -370,6 +373,7 @@ class SqlParser extends AbstractSparkSQLParser { | dotExpressionHeader | ident ^^ UnresolvedAttribute | signedPrimary +| ~ ~ expression ^^ BitwiseNot ) protected lazy val dotExpressionHeader: Parser[Expression] = http://git-wip-us.apache.org/repos/asf/spark/blob/5807cb40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 75b6e37..23cfd48 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -62,12 +62,16 @@ package object dsl { def unary_- = UnaryMinus(expr) def unary_! = Not(expr) +def unary_~ = BitwiseNot(expr) def + (other: Expression) = Add(expr, other) def - (other: Expression) = Subtract(expr, other) def * (other: Expression) = Multiply(expr, other) def /
git commit: [SPARK-3988][SQL] add public API for date type
Repository: spark Updated Branches: refs/heads/master 5807cb40a - 47a40f60d [SPARK-3988][SQL] add public API for date type Add json and python api for date type. By using Pickle, `java.sql.Date` was serialized as calendar, and recognized in python as `datetime.datetime`. Author: Daoyuan Wang daoyuan.w...@intel.com Closes #2901 from adrian-wang/spark3988 and squashes the following commits: c51a24d [Daoyuan Wang] convert datetime to date 5670626 [Daoyuan Wang] minor line combine f760d8e [Daoyuan Wang] fix indent 444f100 [Daoyuan Wang] fix a typo 1d74448 [Daoyuan Wang] fix scala style 8d7dd22 [Daoyuan Wang] add json and python api for date type Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47a40f60 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47a40f60 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47a40f60 Branch: refs/heads/master Commit: 47a40f60d62ea69b659959994918d4c640f39d5b Parents: 5807cb40 Author: Daoyuan Wang daoyuan.w...@intel.com Authored: Tue Oct 28 13:43:25 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Oct 28 13:43:25 2014 -0700 -- python/pyspark/sql.py | 57 +--- .../spark/sql/catalyst/ScalaReflection.scala| 1 + .../spark/sql/catalyst/types/dataTypes.scala| 4 +- .../sql/catalyst/ScalaReflectionSuite.scala | 9 +++- .../scala/org/apache/spark/sql/SQLContext.scala | 10 ++-- .../org/apache/spark/sql/json/JsonRDD.scala | 20 --- .../apache/spark/sql/api/java/JavaRowSuite.java | 11 ++-- .../java/JavaSideDataTypeConversionSuite.java | 1 + .../java/ScalaSideDataTypeConversionSuite.scala | 1 + .../org/apache/spark/sql/json/JsonSuite.scala | 9 ++-- 10 files changed, 87 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/47a40f60/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 7daf306..93fd9d4 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -49,7 +49,7 @@ from pyspark.traceback_utils import SCCallSiteSync __all__ = [ -StringType, BinaryType, BooleanType, TimestampType, DecimalType, +StringType, BinaryType, BooleanType, DateType, TimestampType, DecimalType, DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType, MapType, StructField, StructType, SQLContext, HiveContext, SchemaRDD, Row] @@ -132,6 +132,14 @@ class BooleanType(PrimitiveType): +class DateType(PrimitiveType): + +Spark SQL DateType + +The data type representing datetime.date values. + + + class TimestampType(PrimitiveType): Spark SQL TimestampType @@ -438,7 +446,7 @@ def _parse_datatype_json_value(json_value): return _all_complex_types[json_value[type]].fromJson(json_value) -# Mapping Python types to Spark SQL DateType +# Mapping Python types to Spark SQL DataType _type_mappings = { bool: BooleanType, int: IntegerType, @@ -448,8 +456,8 @@ _type_mappings = { unicode: StringType, bytearray: BinaryType, decimal.Decimal: DecimalType, +datetime.date: DateType, datetime.datetime: TimestampType, -datetime.date: TimestampType, datetime.time: TimestampType, } @@ -656,10 +664,10 @@ def _infer_schema_type(obj, dataType): Fill the dataType with types infered from obj - schema = _parse_schema_abstract(a b c) - row = (1, 1.0, str) + schema = _parse_schema_abstract(a b c d) + row = (1, 1.0, str, datetime.date(2014, 10, 10)) _infer_schema_type(row, schema) -StructType...IntegerType...DoubleType...StringType... +StructType...IntegerType...DoubleType...StringType...DateType... row = [[1], {key: (1, 2.0)}] schema = _parse_schema_abstract(a[] b{c d}) _infer_schema_type(row, schema) @@ -703,6 +711,7 @@ _acceptable_types = { DecimalType: (decimal.Decimal,), StringType: (str, unicode), BinaryType: (bytearray,), +DateType: (datetime.date,), TimestampType: (datetime.datetime,), ArrayType: (list, tuple, array), MapType: (dict,), @@ -740,7 +749,7 @@ def _verify_type(obj, dataType): # subclass of them can not be deserialized in JVM if type(obj) not in _acceptable_types[_type]: -raise TypeError(%s can not accept abject in type %s +raise TypeError(%s can not accept object in type %s % (dataType, type(obj))) if isinstance(dataType, ArrayType): @@ -767,7 +776,7 @@ def _restore_object(dataType, obj): Restore object during unpickling. # use id(dataType) as key to speed up lookup in dict # Because of batched pickling, dataType will be the -# same object in mose
git commit: [Spark 3922] Refactor spark-core to use Utils.UTF_8
Repository: spark Updated Branches: refs/heads/master 47a40f60d - abcafcfba [Spark 3922] Refactor spark-core to use Utils.UTF_8 A global UTF8 constant is very helpful to handle encoding problems when converting between String and bytes. There are several solutions here: 1. Add `val UTF_8 = Charset.forName(UTF-8)` to Utils.scala 2. java.nio.charset.StandardCharsets.UTF_8 (require JDK7) 3. io.netty.util.CharsetUtil.UTF_8 4. com.google.common.base.Charsets.UTF_8 5. org.apache.commons.lang.CharEncoding.UTF_8 6. org.apache.commons.lang3.CharEncoding.UTF_8 IMO, I prefer option 1) because people can find it easily. This is a PR for option 1) and only fixes Spark Core. Author: zsxwing zsxw...@gmail.com Closes #2781 from zsxwing/SPARK-3922 and squashes the following commits: f974edd [zsxwing] Merge branch 'master' into SPARK-3922 2d27423 [zsxwing] Refactor spark-core to use Refactor spark-core to use Utils.UTF_8 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abcafcfb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abcafcfb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abcafcfb Branch: refs/heads/master Commit: abcafcfba38d7c8dba68a5510475c5c49ae54d92 Parents: 47a40f6 Author: zsxwing zsxw...@gmail.com Authored: Tue Oct 28 14:26:57 2014 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Oct 28 14:26:57 2014 -0700 -- .../main/scala/org/apache/spark/SparkSaslClient.scala | 7 --- .../main/scala/org/apache/spark/SparkSaslServer.scala | 10 ++ .../scala/org/apache/spark/api/python/PythonRDD.scala | 9 - .../api/python/WriteInputFormatTestDataGenerator.scala | 5 +++-- .../org/apache/spark/deploy/worker/DriverRunner.scala | 4 ++-- .../org/apache/spark/deploy/worker/ExecutorRunner.scala | 4 ++-- .../network/netty/client/BlockFetchingClient.scala | 4 ++-- .../netty/client/BlockFetchingClientHandler.scala | 5 +++-- .../apache/spark/network/netty/server/BlockServer.scala | 4 ++-- .../netty/server/BlockServerChannelInitializer.scala| 6 +++--- .../apache/spark/network/nio/ConnectionManager.scala| 4 +++- .../scala/org/apache/spark/network/nio/Message.scala| 4 +++- .../netty/client/BlockFetchingClientHandlerSuite.scala | 3 ++- .../network/netty/server/BlockHeaderEncoderSuite.scala | 8 .../scala/org/apache/spark/util/FileAppenderSuite.scala | 12 ++-- .../test/scala/org/apache/spark/util/UtilsSuite.scala | 12 ++-- 16 files changed, 55 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/SparkSaslClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala index 65003b6..a954fcc 100644 --- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala +++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala @@ -17,7 +17,6 @@ package org.apache.spark -import java.io.IOException import javax.security.auth.callback.Callback import javax.security.auth.callback.CallbackHandler import javax.security.auth.callback.NameCallback @@ -31,6 +30,8 @@ import javax.security.sasl.SaslException import scala.collection.JavaConversions.mapAsJavaMap +import com.google.common.base.Charsets.UTF_8 + /** * Implements SASL Client logic for Spark */ @@ -111,10 +112,10 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg CallbackHandler { private val userName: String = - SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(utf-8)) + SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(UTF_8)) private val secretKey = securityMgr.getSecretKey() private val userPassword: Array[Char] = SparkSaslServer.encodePassword( -if (secretKey != null) secretKey.getBytes(utf-8) else .getBytes(utf-8)) +if (secretKey != null) secretKey.getBytes(UTF_8) else .getBytes(UTF_8)) /** * Implementation used to respond to SASL request from the server. http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/SparkSaslServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala index f6b0a91..7c2afb3 100644 --- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala +++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala @@ -28,6 +28,8 @@ import javax.security.sasl.Sasl import javax.security.sasl.SaslException import
git commit: [SPARK-3343] [SQL] Add serde support for CTAS
Repository: spark Updated Branches: refs/heads/master abcafcfba - 4b55482ab [SPARK-3343] [SQL] Add serde support for CTAS Currently, `CTAS` (Create Table As Select) doesn't support specifying the `SerDe` in HQL. This PR will pass down the `ASTNode` into the physical operator `execution.CreateTableAsSelect`, which will extract the `CreateTableDesc` object via Hive `SemanticAnalyzer`. In the meantime, I also update the `HiveMetastoreCatalog.createTable` to optionally support the `CreateTableDesc` for table creation. Author: Cheng Hao hao.ch...@intel.com Closes #2570 from chenghao-intel/ctas_serde and squashes the following commits: e011ef5 [Cheng Hao] shim for both 0.12 0.13.1 cfb3662 [Cheng Hao] revert to hive 0.12 c8a547d [Cheng Hao] Support SerDe properties within CTAS Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b55482a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b55482a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b55482a Branch: refs/heads/master Commit: 4b55482abf899c27da3d55401ad26b4e9247b327 Parents: abcafcf Author: Cheng Hao hao.ch...@intel.com Authored: Tue Oct 28 14:36:06 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Oct 28 14:36:06 2014 -0700 -- .../catalyst/plans/logical/basicOperators.scala | 8 +- .../org/apache/spark/sql/SchemaRDDLike.scala| 4 +- .../scala/org/apache/spark/sql/QueryTest.scala | 19 ++ .../hive/execution/HiveCompatibilitySuite.scala | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 197 --- .../org/apache/spark/sql/hive/HiveQl.scala | 15 +- .../apache/spark/sql/hive/HiveStrategies.scala | 17 +- .../hive/execution/CreateTableAsSelect.scala| 39 ++-- .../scala/org/apache/spark/sql/QueryTest.scala | 19 ++ .../sql/hive/execution/HiveExplainSuite.scala | 37 ++-- .../sql/hive/execution/SQLQuerySuite.scala | 59 ++ .../org/apache/spark/sql/hive/Shim12.scala | 5 +- .../org/apache/spark/sql/hive/Shim13.scala | 6 +- 13 files changed, 337 insertions(+), 94 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 14b03c7..00bdf10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -114,11 +114,13 @@ case class InsertIntoTable( } } -case class CreateTableAsSelect( +case class CreateTableAsSelect[T]( databaseName: Option[String], tableName: String, -child: LogicalPlan) extends UnaryNode { - override def output = child.output +child: LogicalPlan, +allowExisting: Boolean, +desc: Option[T] = None) extends UnaryNode { + override def output = Seq.empty[Attribute] override lazy val resolved = (databaseName != None childrenResolved) } http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index 25ba7d8..15516af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike { @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match { // For various commands (like DDL) and queries with side effects, we force query optimization to // happen right away to let these side effects take place eagerly. -case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile = +case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile = LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) case _ = baseLogicalPlan @@ -123,7 +123,7 @@ private[sql] trait SchemaRDDLike { */ @Experimental def saveAsTable(tableName: String): Unit = -sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd +sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd /** Returns the schema as a string in the tree format. *
[1/2] [SPARK-4084] Reuse sort key in Sorter
Repository: spark Updated Branches: refs/heads/master 4b55482ab - 84e5da87e http://git-wip-us.apache.org/repos/asf/spark/blob/84e5da87/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 6fe1079..066d47c 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.collection -import java.lang.{Float = JFloat} +import java.lang.{Float = JFloat, Integer = JInteger} import java.util.{Arrays, Comparator} import org.scalatest.FunSuite @@ -30,11 +30,15 @@ class SorterSuite extends FunSuite { val rand = new XORShiftRandom(123) val data0 = Array.tabulate[Int](1) { i = rand.nextInt() } val data1 = data0.clone() +val data2 = data0.clone() Arrays.sort(data0) new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, Ordering.Int) +new Sorter(new KeyReuseIntArraySortDataFormat) + .sort(data2, 0, data2.length, Ordering[IntWrapper]) -data0.zip(data1).foreach { case (x, y) = assert(x === y) } +assert(data0.view === data1.view) +assert(data0.view === data2.view) } test(KVArraySorter) { @@ -61,10 +65,33 @@ class SorterSuite extends FunSuite { } } + /** Runs an experiment several times. */ + def runExperiment(name: String, skip: Boolean = false)(f: = Unit, prepare: () = Unit): Unit = { +if (skip) { + println(sSkipped experiment $name.) + return +} + +val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare)) +System.gc() + +var i = 0 +var next10: Long = 0 +while (i 10) { + val time = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare)) + next10 += time + println(s$name: Took $time ms) + i += 1 +} + +println(s$name: ($firstTry ms first try, ${next10 / 10} ms average)) + } + /** * This provides a simple benchmark for comparing the Sorter with Java internal sorting. * Ideally these would be executed one at a time, each in their own JVM, so their listing - * here is mainly to have the code. + * here is mainly to have the code. Running multiple tests within the same JVM session would + * prevent JIT inlining overridden methods and hence hurt the performance. * * The goal of this code is to sort an array of key-value pairs, where the array physically * has the keys and values alternating. The basic Java sorts work only on the keys, so the @@ -72,96 +99,167 @@ class SorterSuite extends FunSuite { * those, while the Sorter approach can work directly on the input data format. * * Note that the Java implementation varies tremendously between Java 6 and Java 7, when - * the Java sort changed from merge sort to Timsort. + * the Java sort changed from merge sort to TimSort. */ - ignore(Sorter benchmark) { - -/** Runs an experiment several times. */ -def runExperiment(name: String)(f: = Unit): Unit = { - val firstTry = org.apache.spark.util.Utils.timeIt(1)(f) - System.gc() - - var i = 0 - var next10: Long = 0 - while (i 10) { -val time = org.apache.spark.util.Utils.timeIt(1)(f) -next10 += time -println(s$name: Took $time ms) -i += 1 - } - - println(s$name: ($firstTry ms first try, ${next10 / 10} ms average)) -} - + ignore(Sorter benchmark for key-value pairs) { val numElements = 2500 // 25 mil val rand = new XORShiftRandom(123) -val keys = Array.tabulate[JFloat](numElements) { i = - new JFloat(rand.nextFloat()) +// Test our key-value pairs where each element is a Tuple2[Float, Integer]. + +val kvTuples = Array.tabulate(numElements) { i = + (new JFloat(rand.nextFloat()), new JInteger(i)) } -// Test our key-value pairs where each element is a Tuple2[Float, Integer) -val kvTupleArray = Array.tabulate[AnyRef](numElements) { i = - (keys(i / 2): Float, i / 2: Int) +val kvTupleArray = new Array[AnyRef](numElements) +val prepareKvTupleArray = () = { + System.arraycopy(kvTuples, 0, kvTupleArray, 0, numElements) } -runExperiment(Tuple-sort using Arrays.sort()) { +runExperiment(Tuple-sort using Arrays.sort())({ Arrays.sort(kvTupleArray, new Comparator[AnyRef] { override def compare(x: AnyRef, y: AnyRef): Int = - Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1) + x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, _)]._1) }) -} +}, prepareKvTupleArray) // Test our Sorter where each element alternates between Float and Integer, non-primitive -
[2/2] git commit: [SPARK-4084] Reuse sort key in Sorter
[SPARK-4084] Reuse sort key in Sorter Sorter uses generic-typed key for sorting. When data is large, it creates lots of key objects, which is not efficient. We should reuse the key in Sorter for memory efficiency. This change is part of the petabyte sort implementation from rxin . The `Sorter` class was written in Java and marked package private. So it is only available to `org.apache.spark.util.collection`. I renamed it to `TimSort` and add a simple wrapper of it, still called `Sorter`, in Scala, which is `private[spark]`. The benchmark code is updated, which now resets the array before each run. Here is the result on sorting primitive Int arrays of size 25 million using Sorter: ~~~ [info] - Sorter benchmark for key-value pairs !!! IGNORED !!! Java Arrays.sort() on non-primitive int array: Took 13237 ms Java Arrays.sort() on non-primitive int array: Took 13320 ms Java Arrays.sort() on non-primitive int array: Took 15718 ms Java Arrays.sort() on non-primitive int array: Took 13283 ms Java Arrays.sort() on non-primitive int array: Took 13267 ms Java Arrays.sort() on non-primitive int array: Took 15122 ms Java Arrays.sort() on non-primitive int array: Took 15495 ms Java Arrays.sort() on non-primitive int array: Took 14877 ms Java Arrays.sort() on non-primitive int array: Took 16429 ms Java Arrays.sort() on non-primitive int array: Took 14250 ms Java Arrays.sort() on non-primitive int array: (13878 ms first try, 14499 ms average) Java Arrays.sort() on primitive int array: Took 2683 ms Java Arrays.sort() on primitive int array: Took 2683 ms Java Arrays.sort() on primitive int array: Took 2701 ms Java Arrays.sort() on primitive int array: Took 2746 ms Java Arrays.sort() on primitive int array: Took 2685 ms Java Arrays.sort() on primitive int array: Took 2735 ms Java Arrays.sort() on primitive int array: Took 2669 ms Java Arrays.sort() on primitive int array: Took 2693 ms Java Arrays.sort() on primitive int array: Took 2680 ms Java Arrays.sort() on primitive int array: Took 2642 ms Java Arrays.sort() on primitive int array: (2948 ms first try, 2691 ms average) Sorter without key reuse on primitive int array: Took 10732 ms Sorter without key reuse on primitive int array: Took 12482 ms Sorter without key reuse on primitive int array: Took 10718 ms Sorter without key reuse on primitive int array: Took 12650 ms Sorter without key reuse on primitive int array: Took 10747 ms Sorter without key reuse on primitive int array: Took 10783 ms Sorter without key reuse on primitive int array: Took 12721 ms Sorter without key reuse on primitive int array: Took 10604 ms Sorter without key reuse on primitive int array: Took 10622 ms Sorter without key reuse on primitive int array: Took 11843 ms Sorter without key reuse on primitive int array: (11089 ms first try, 11390 ms average) Sorter with key reuse on primitive int array: Took 5141 ms Sorter with key reuse on primitive int array: Took 5298 ms Sorter with key reuse on primitive int array: Took 5066 ms Sorter with key reuse on primitive int array: Took 5164 ms Sorter with key reuse on primitive int array: Took 5203 ms Sorter with key reuse on primitive int array: Took 5274 ms Sorter with key reuse on primitive int array: Took 5186 ms Sorter with key reuse on primitive int array: Took 5159 ms Sorter with key reuse on primitive int array: Took 5164 ms Sorter with key reuse on primitive int array: Took 5078 ms Sorter with key reuse on primitive int array: (5311 ms first try, 5173 ms average) ~~~ So with key reuse, it is faster and less likely to trigger GC. Author: Xiangrui Meng m...@databricks.com Author: Reynold Xin r...@apache.org Closes #2937 from mengxr/SPARK-4084 and squashes the following commits: d73c3d0 [Xiangrui Meng] address comments 0b7b682 [Xiangrui Meng] fix mima a72f53c [Xiangrui Meng] update timeIt 38ba50c [Xiangrui Meng] update timeIt 720f731 [Xiangrui Meng] add doc about JIT specialization 78f2879 [Xiangrui Meng] update tests 7de2efd [Xiangrui Meng] update the Sorter benchmark code to be correct 8626356 [Xiangrui Meng] add prepare to timeIt and update testsin SorterSuite 5f0d530 [Xiangrui Meng] update method modifiers of SortDataFormat 6ffbe66 [Xiangrui Meng] rename Sorter to TimSort and add a Scala wrapper that is private[spark] b00db4d [Xiangrui Meng] doc and tests cf94e8a [Xiangrui Meng] renaming 464ddce [Reynold Xin] cherry-pick rxin's commit Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84e5da87 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84e5da87 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84e5da87 Branch: refs/heads/master Commit: 84e5da87e32256ba4f3dee6f8bf532ce88322028 Parents: 4b55482 Author: Xiangrui Meng m...@databricks.com Authored: Tue Oct 28 15:14:41 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Oct 28 15:14:41 2014 -0700 --
git commit: [SPARK-4008] Fix kryo with fold in KryoSerializerSuite
Repository: spark Updated Branches: refs/heads/master 84e5da87e - 1536d7033 [SPARK-4008] Fix kryo with fold in KryoSerializerSuite `zeroValue` will be serialized by `spark.closure.serializer` but `spark.closure.serializer` only supports the default Java serializer. So it must not be `ClassWithoutNoArgConstructor`, which can not be serialized by the Java serializer. This PR changed `zeroValue` to null and updated the test to make it work correctly. Author: zsxwing zsxw...@gmail.com Closes #2856 from zsxwing/SPARK-4008 and squashes the following commits: 51da655 [zsxwing] [SPARK-4008] Fix kryo with fold in KryoSerializerSuite Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1536d703 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1536d703 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1536d703 Branch: refs/heads/master Commit: 1536d70331e9a4f5b5ea9dabfd72592ca1fc8e35 Parents: 84e5da8 Author: zsxwing zsxw...@gmail.com Authored: Tue Oct 28 17:59:10 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Oct 28 17:59:10 2014 -0700 -- .../apache/spark/serializer/KryoSerializerSuite.scala | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1536d703/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 64ac6d2..a70f67a 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -201,12 +201,17 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert(control.sum === result) } - // TODO: this still doesn't work - ignore(kryo with fold) { + test(kryo with fold) { val control = 1 :: 2 :: Nil +// zeroValue must not be a ClassWithoutNoArgConstructor instance because it will be +// serialized by spark.closure.serializer but spark.closure.serializer only supports +// the default Java serializer. val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)) -.fold(new ClassWithoutNoArgConstructor(10))((t1, t2) = new ClassWithoutNoArgConstructor(t1.x + t2.x)).x -assert(10 + control.sum === result) + .fold(null)((t1, t2) = { + val t1x = if (t1 == null) 0 else t1.x + new ClassWithoutNoArgConstructor(t1x + t2.x) +}).x +assert(control.sum === result) } test(kryo with nonexistent custom registrator should fail) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3904] [SQL] add constant objectinspector support for udfs
Repository: spark Updated Branches: refs/heads/master 1536d7033 - b5e79bf88 [SPARK-3904] [SQL] add constant objectinspector support for udfs In HQL, we convert all of the data type into normal `ObjectInspector`s for UDFs, most of cases it works, however, some of the UDF actually requires its children `ObjectInspector` to be the `ConstantObjectInspector`, which will cause exception. e.g. select named_struct(x, str) from src limit 1; I updated the method `wrap` by adding the one more parameter `ObjectInspector`(to describe what it expects to wrap to, for example: java.lang.Integer or IntWritable). As well as the `unwrap` method by providing the input `ObjectInspector`. Author: Cheng Hao hao.ch...@intel.com Closes #2762 from chenghao-intel/udf_coi and squashes the following commits: bcacfd7 [Cheng Hao] Shim for both Hive 0.12 0.13.1 2416e5d [Cheng Hao] revert to hive 0.12 5793c01 [Cheng Hao] add space before while 4e56e1b [Cheng Hao] style issue 683d3fd [Cheng Hao] Add golden files fe591e4 [Cheng Hao] update HiveGenericUdf for set the ObjectInspector while constructing the DeferredObject f6740fe [Cheng Hao] Support Constant ObjectInspector for Map List 8814c3a [Cheng Hao] Passing ContantObjectInspector(when necessary) for UDF initializing Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b5e79bf8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b5e79bf8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b5e79bf8 Branch: refs/heads/master Commit: b5e79bf889700159d490cdac1f6322dff424b1d9 Parents: 1536d70 Author: Cheng Hao hao.ch...@intel.com Authored: Tue Oct 28 19:11:57 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Oct 28 19:11:57 2014 -0700 -- .../hive/execution/HiveCompatibilitySuite.scala | 8 +- .../apache/spark/sql/hive/HiveInspectors.scala | 185 --- .../org/apache/spark/sql/hive/TableReader.scala | 2 +- .../org/apache/spark/sql/hive/hiveUdfs.scala| 62 --- ...tant array-0-761ef205b10ac4a10122c8b4ce10ada | 1 + ...med_struct-0-8f0ea83364b78634fbb3752c5a5c725 | 1 + ...ed_struct-1-380c9638cc6ea8ea42f187bf0cedf350 | 1 + ...ed_struct-2-22a79ac608b1249306f82f4bdc669b17 | 0 ...ed_struct-3-d7e4a555934307155784904ff9df188b | 1 + ...ort_array-0-e86d559aeb84a4cc017a103182c22bfb | 0 ...ort_array-1-976cd8b6b50a2748bbc768aa5e11cf82 | 1 + ...rt_array-10-9e047718e5fea6ea79124f1e899f1c13 | 1 + ...ort_array-2-c429ec85a6da60ebd4bc6f0f266e8b93 | 4 + ...ort_array-3-55c4cdaf8438b06675d60848d68f35de | 0 ...df_struct-0-f41043b7d9f14fa5e998c90454c7bdb1 | 1 + ...df_struct-1-8ccdb20153debdab789ea8ad0228e2eb | 1 + ...df_struct-2-4a62774a6de7571c8d2bcb77da63f8f3 | 0 ...df_struct-3-abffdaacb0c7076ab538fbeec072daa2 | 1 + .../sql/hive/execution/HiveQuerySuite.scala | 8 + .../org/apache/spark/sql/hive/Shim12.scala | 57 ++ .../org/apache/spark/sql/hive/Shim13.scala | 64 ++- 21 files changed, 307 insertions(+), 92 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b5e79bf8/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala -- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 26d9ca0..1a3c24b 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -233,7 +233,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Sort with Limit clause causes failure. ctas, -ctas_hadoop20 +ctas_hadoop20, + +// timestamp in array, the output format of Hive contains double quotes, while +// Spark SQL doesn't +udf_sort_array ) ++ HiveShim.compatibilityBlackList /** @@ -861,6 +865,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { udf_minute, udf_modulo, udf_month, +udf_named_struct, udf_negative, udf_not, udf_notequal, @@ -894,6 +899,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { udf_stddev_pop, udf_stddev_samp, udf_string, +udf_struct, udf_substring, udf_subtract, udf_sum, http://git-wip-us.apache.org/repos/asf/spark/blob/b5e79bf8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala -- diff --git
git commit: [SPARK-4133] [SQL] [PySpark] type conversionfor python udf
Repository: spark Updated Branches: refs/heads/master b5e79bf88 - 8c0bfd08f [SPARK-4133] [SQL] [PySpark] type conversionfor python udf Call Python UDF on ArrayType/MapType/PrimitiveType, the returnType can also be ArrayType/MapType/PrimitiveType. For StructType, it will act as tuple (without attributes). If returnType is StructType, it also should be tuple. Author: Davies Liu dav...@databricks.com Closes #2973 from davies/udf_array and squashes the following commits: 306956e [Davies Liu] Merge branch 'master' of github.com:apache/spark into udf_array 2c00e43 [Davies Liu] fix merge 11395fa [Davies Liu] Merge branch 'master' of github.com:apache/spark into udf_array 9df50a2 [Davies Liu] address comments 79afb4e [Davies Liu] type conversionfor python udf Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c0bfd08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c0bfd08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c0bfd08 Branch: refs/heads/master Commit: 8c0bfd08fc19fa5de7d77bf8306d19834f907ec0 Parents: b5e79bf Author: Davies Liu dav...@databricks.com Authored: Tue Oct 28 19:38:16 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Oct 28 19:38:16 2014 -0700 -- python/pyspark/tests.py | 16 +++- .../scala/org/apache/spark/sql/SQLContext.scala | 43 + .../scala/org/apache/spark/sql/SchemaRDD.scala | 42 ++--- .../apache/spark/sql/execution/pythonUdfs.scala | 91 ++-- 4 files changed, 102 insertions(+), 90 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c0bfd08/python/pyspark/tests.py -- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 047d857..37a1289 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -49,7 +49,7 @@ from pyspark.files import SparkFiles from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \ CloudPickleSerializer from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter -from pyspark.sql import SQLContext, IntegerType, Row +from pyspark.sql import SQLContext, IntegerType, Row, ArrayType from pyspark import shuffle _have_scipy = False @@ -690,10 +690,20 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(row[0], 5) def test_udf2(self): -self.sqlCtx.registerFunction(strlen, lambda string: len(string)) +self.sqlCtx.registerFunction(strlen, lambda string: len(string), IntegerType()) self.sqlCtx.inferSchema(self.sc.parallelize([Row(a=test)])).registerTempTable(test) [res] = self.sqlCtx.sql(SELECT strlen(a) FROM test WHERE strlen(a) 1).collect() -self.assertEqual(u4, res[0]) +self.assertEqual(4, res[0]) + +def test_udf_with_array_type(self): +d = [Row(l=range(3), d={key: range(5)})] +rdd = self.sc.parallelize(d) +srdd = self.sqlCtx.inferSchema(rdd).registerTempTable(test) +self.sqlCtx.registerFunction(copylist, lambda l: list(l), ArrayType(IntegerType())) +self.sqlCtx.registerFunction(maplen, lambda d: len(d), IntegerType()) +[(l1, l2)] = self.sqlCtx.sql(select copylist(l), maplen(d) from test).collect() +self.assertEqual(range(3), l1) +self.assertEqual(1, l2) def test_broadcast_in_udf(self): bar = {a: aa, b: bb, c: abc} http://git-wip-us.apache.org/repos/asf/spark/blob/8c0bfd08/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ca8706e..a41a500 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -438,7 +438,6 @@ class SQLContext(@transient val sparkContext: SparkContext) private[sql] def applySchemaToPythonRDD( rdd: RDD[Array[Any]], schema: StructType): SchemaRDD = { -import scala.collection.JavaConversions._ def needsConversion(dataType: DataType): Boolean = dataType match { case ByteType = true @@ -452,49 +451,9 @@ class SQLContext(@transient val sparkContext: SparkContext) case other = false } -// Converts value to the type specified by the data type. -// Because Python does not have data types for DateType, TimestampType, FloatType, ShortType, -// and ByteType, we need to explicitly convert values in columns of these data types to the -// desired JVM data types. -def convert(obj: Any, dataType: DataType): Any = (obj,