git commit: [SPARK-4107] Fix incorrect handling of read() and skip() return values

2014-10-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4ceb048b3 - 46c63417c


[SPARK-4107] Fix incorrect handling of read() and skip() return values

`read()` may return fewer bytes than requested; when this occurred, the old 
code would silently return less data than requested, which might cause stream 
corruption errors.  `skip()` faces similar issues, too.

This patch fixes several cases where we mis-handle these methods' return values.

Author: Josh Rosen joshro...@databricks.com

Closes #2969 from JoshRosen/file-channel-read-fix and squashes the following 
commits:

e724a9f [Josh Rosen] Fix similar issue of not checking skip() return value.
cbc03ce [Josh Rosen] Update the other log message, too.
01e6015 [Josh Rosen] file.getName - file.getAbsolutePath
d961d95 [Josh Rosen] Fix another issue in FileServerSuite.
b9265d2 [Josh Rosen] Fix a similar (minor) issue in TestUtils.
cd9d76f [Josh Rosen] Fix a similar error in Tachyon:
3db0008 [Josh Rosen] Fix a similar read() error in Utils.offsetBytes().
db985ed [Josh Rosen] Fix unsafe usage of FileChannel.read():


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46c63417
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46c63417
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46c63417

Branch: refs/heads/master
Commit: 46c63417c1bb1aea07baf9036cc5b8f1c3781bbe
Parents: 4ceb048
Author: Josh Rosen joshro...@databricks.com
Authored: Tue Oct 28 00:04:16 2014 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Tue Oct 28 00:04:16 2014 -0700

--
 .../main/scala/org/apache/spark/TestUtils.scala |  9 ++---
 .../apache/spark/network/ManagedBuffer.scala| 10 --
 .../shuffle/IndexShuffleBlockManager.scala  |  4 +++-
 .../org/apache/spark/storage/DiskStore.scala| 10 --
 .../org/apache/spark/storage/TachyonStore.scala | 21 +++-
 .../scala/org/apache/spark/util/Utils.scala |  6 +++---
 .../org/apache/spark/FileServerSuite.scala  |  8 ++--
 7 files changed, 33 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/TestUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index e72826d..3407814 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream}
 
 import scala.collection.JavaConversions._
 
+import com.google.common.io.{ByteStreams, Files}
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
-import com.google.common.io.Files
 
 import org.apache.spark.util.Utils
 
@@ -64,12 +64,7 @@ private[spark] object TestUtils {
   jarStream.putNextEntry(jarEntry)
 
   val in = new FileInputStream(file)
-  val buffer = new Array[Byte](10240)
-  var nRead = 0
-  while (nRead = 0) {
-nRead = in.read(buffer, 0, buffer.length)
-jarStream.write(buffer, 0, nRead)
-  }
+  ByteStreams.copy(in, jarStream)
   in.close()
 }
 jarStream.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/46c63417/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
--
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala 
b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index 4c9ca97..4211ba4 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -81,7 +81,13 @@ final class FileSegmentManagedBuffer(val file: File, val 
offset: Long, val lengt
   // Just copy the buffer if it's sufficiently small, as memory mapping 
has a high overhead.
   if (length  MIN_MEMORY_MAP_BYTES) {
 val buf = ByteBuffer.allocate(length.toInt)
-channel.read(buf, offset)
+channel.position(offset)
+while (buf.remaining() != 0) {
+  if (channel.read(buf) == -1) {
+throw new IOException(Reached EOF before filling buffer\n +
+  
soffset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining})
+  }
+}
 buf.flip()
 buf
   } else {
@@ -106,7 +112,7 @@ final class FileSegmentManagedBuffer(val file: File, val 
offset: Long, val lengt
 var is: FileInputStream = null
 try {
   is = new FileInputStream(file)
-  is.skip(offset)
+  ByteStreams.skipFully(is, offset)
   ByteStreams.limit(is, length)
 } catch {
   case e: IOException =


git commit: [SPARK-4116][YARN]Delete the abandoned log4j-spark-container.properties

2014-10-28 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master fae095bc7 - 47346cd02


[SPARK-4116][YARN]Delete the abandoned log4j-spark-container.properties

Since its name reduced at https://github.com/apache/spark/pull/560, the 
log4j-spark-container.properties was never used again.
And I have searched its name globally in code and found no cite.

Author: WangTaoTheTonic barneystin...@aliyun.com

Closes #2977 from WangTaoTheTonic/delLog4j and squashes the following commits:

fb2729f [WangTaoTheTonic] delete the log4j file obsoleted


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47346cd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47346cd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47346cd0

Branch: refs/heads/master
Commit: 47346cd029abc50c70582a721810a7cceb682d8a
Parents: fae095b
Author: WangTaoTheTonic barneystin...@aliyun.com
Authored: Tue Oct 28 08:46:31 2014 -0500
Committer: Thomas Graves tgra...@apache.org
Committed: Tue Oct 28 08:46:31 2014 -0500

--
 .../resources/log4j-spark-container.properties  | 24 
 1 file changed, 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/47346cd0/yarn/common/src/main/resources/log4j-spark-container.properties
--
diff --git a/yarn/common/src/main/resources/log4j-spark-container.properties 
b/yarn/common/src/main/resources/log4j-spark-container.properties
deleted file mode 100644
index a1e37a0..000
--- a/yarn/common/src/main/resources/log4j-spark-container.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the License);
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an AS IS BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License. See accompanying LICENSE file.
-
-# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.eclipse.jetty=WARN
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4098][YARN]use appUIAddress instead of appUIHostPort in yarn-client mode

2014-10-28 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master e8813be65 - 0ac52e305


[SPARK-4098][YARN]use appUIAddress instead of appUIHostPort in yarn-client mode

https://issues.apache.org/jira/browse/SPARK-4098

Author: WangTaoTheTonic barneystin...@aliyun.com

Closes #2958 from WangTaoTheTonic/useAddress and squashes the following commits:

29236e6 [WangTaoTheTonic] use appUIAddress instead of appUIHostPort in 
yarn-cluster mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ac52e30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ac52e30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ac52e30

Branch: refs/heads/master
Commit: 0ac52e30552530b247e37a470b8503346f19605c
Parents: e8813be
Author: WangTaoTheTonic barneystin...@aliyun.com
Authored: Tue Oct 28 09:51:44 2014 -0500
Committer: Thomas Graves tgra...@apache.org
Committed: Tue Oct 28 09:51:44 2014 -0500

--
 .../spark/scheduler/cluster/YarnClientSchedulerBackend.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0ac52e30/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
--
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d948a2a..59b2b47 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -48,7 +48,7 @@ private[spark] class YarnClientSchedulerBackend(
 val driverHost = conf.get(spark.driver.host)
 val driverPort = conf.get(spark.driver.port)
 val hostport = driverHost + : + driverPort
-sc.ui.foreach { ui = conf.set(spark.driver.appUIAddress, 
ui.appUIHostPort) }
+sc.ui.foreach { ui = conf.set(spark.driver.appUIAddress, 
ui.appUIAddress) }
 
 val argsArrayBuf = new ArrayBuffer[String]()
 argsArrayBuf += (--arg, hostport)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4110] Wrong comments about default settings in spark-daemon.sh

2014-10-28 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 7768a800d - 44d8b45a3


[SPARK-4110] Wrong comments about default settings in spark-daemon.sh

In spark-daemon.sh, thare are following comments.

#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_PREFIX}/conf.
#   SPARK_LOG_DIR   Where log files are stored.  PWD by default.

But, I think the default value for SPARK_CONF_DIR is `${SPARK_HOME}/conf` and 
for SPARK_LOG_DIR is `${SPARK_HOME}/logs`.

Author: Kousuke Saruta saru...@oss.nttdata.co.jp

Closes #2972 from sarutak/SPARK-4110 and squashes the following commits:

5a171a2 [Kousuke Saruta] Fixed wrong comments


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44d8b45a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44d8b45a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44d8b45a

Branch: refs/heads/master
Commit: 44d8b45a38c8d934628373a3b21084432516ee00
Parents: 7768a80
Author: Kousuke Saruta saru...@oss.nttdata.co.jp
Authored: Tue Oct 28 12:29:01 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Tue Oct 28 12:29:01 2014 -0700

--
 sbin/spark-daemon.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/44d8b45a/sbin/spark-daemon.sh
--
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index cba475e..89608bc 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -21,8 +21,8 @@
 #
 # Environment Variables
 #
-#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_PREFIX}/conf.
-#   SPARK_LOG_DIR   Where log files are stored.  PWD by default.
+#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_HOME}/conf.
+#   SPARK_LOG_DIR   Where log files are stored. ${SPARK_HOME}/logs by default.
 #   SPARK_MASTERhost:path where spark code should be rsync'd from
 #   SPARK_PID_DIR   The pid files are stored. /tmp by default.
 #   SPARK_IDENT_STRING   A string representing this instance of spark. $USER 
by default


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4110] Wrong comments about default settings in spark-daemon.sh

2014-10-28 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 2ef2f5a7c - dee331738


[SPARK-4110] Wrong comments about default settings in spark-daemon.sh

In spark-daemon.sh, thare are following comments.

#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_PREFIX}/conf.
#   SPARK_LOG_DIR   Where log files are stored.  PWD by default.

But, I think the default value for SPARK_CONF_DIR is `${SPARK_HOME}/conf` and 
for SPARK_LOG_DIR is `${SPARK_HOME}/logs`.

Author: Kousuke Saruta saru...@oss.nttdata.co.jp

Closes #2972 from sarutak/SPARK-4110 and squashes the following commits:

5a171a2 [Kousuke Saruta] Fixed wrong comments

(cherry picked from commit 44d8b45a38c8d934628373a3b21084432516ee00)
Signed-off-by: Andrew Or and...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dee33173
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dee33173
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dee33173

Branch: refs/heads/branch-1.1
Commit: dee33173865c40e1270af581ee5f27b4931dc6d0
Parents: 2ef2f5a
Author: Kousuke Saruta saru...@oss.nttdata.co.jp
Authored: Tue Oct 28 12:29:01 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Tue Oct 28 12:29:12 2014 -0700

--
 sbin/spark-daemon.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dee33173/sbin/spark-daemon.sh
--
diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh
index 9032f23..1b93165 100755
--- a/sbin/spark-daemon.sh
+++ b/sbin/spark-daemon.sh
@@ -21,8 +21,8 @@
 #
 # Environment Variables
 #
-#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_PREFIX}/conf.
-#   SPARK_LOG_DIR   Where log files are stored.  PWD by default.
+#   SPARK_CONF_DIR  Alternate conf dir. Default is ${SPARK_HOME}/conf.
+#   SPARK_LOG_DIR   Where log files are stored. ${SPARK_HOME}/logs by default.
 #   SPARK_MASTERhost:path where spark code should be rsync'd from
 #   SPARK_PID_DIR   The pid files are stored. /tmp by default.
 #   SPARK_IDENT_STRING   A string representing this instance of spark. $USER 
by default


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4107] Fix incorrect handling of read() and skip() return values (branch-1.1 backport)

2014-10-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 dee331738 - 286f1efb0


[SPARK-4107] Fix incorrect handling of read() and skip() return values 
(branch-1.1 backport)

`read()` may return fewer bytes than requested; when this occurred, the old 
code would silently return less data than requested, which might cause stream 
corruption errors.  `skip()` faces similar issues, too.

This patch fixes several cases where we mis-handle these methods' return values.

This is a backport of #2969 to `branch-1.1`.

Author: Josh Rosen joshro...@databricks.com

Closes #2974 from JoshRosen/spark-4107-branch-1.1-backport and squashes the 
following commits:

d82c05b [Josh Rosen] [SPARK-4107] Fix incorrect handling of read() and skip() 
return values


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/286f1efb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/286f1efb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/286f1efb

Branch: refs/heads/branch-1.1
Commit: 286f1efb0554f055de5dfc0b317b1dff120ce5a0
Parents: dee3317
Author: Josh Rosen joshro...@databricks.com
Authored: Tue Oct 28 12:30:12 2014 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Tue Oct 28 12:30:12 2014 -0700

--
 .../main/scala/org/apache/spark/TestUtils.scala |  9 ++---
 .../org/apache/spark/storage/DiskStore.scala| 10 --
 .../org/apache/spark/storage/TachyonStore.scala | 21 +++-
 .../scala/org/apache/spark/util/Utils.scala |  6 +++---
 .../org/apache/spark/FileServerSuite.scala  |  8 ++--
 5 files changed, 22 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/TestUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 8ca7310..c5e7a73 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -23,8 +23,8 @@ import java.util.jar.{JarEntry, JarOutputStream}
 
 import scala.collection.JavaConversions._
 
+import com.google.common.io.{ByteStreams, Files}
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
-import com.google.common.io.Files
 
 /**
  * Utilities for tests. Included in main codebase since it's used by multiple
@@ -63,12 +63,7 @@ private[spark] object TestUtils {
   jarStream.putNextEntry(jarEntry)
 
   val in = new FileInputStream(file)
-  val buffer = new Array[Byte](10240)
-  var nRead = 0
-  while (nRead = 0) {
-nRead = in.read(buffer, 0, buffer.length)
-jarStream.write(buffer, 0, nRead)
-  }
+  ByteStreams.copy(in, jarStream)
   in.close()
 }
 jarStream.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 295c706..247e240 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, RandomAccessFile}
+import java.io.{IOException, FileOutputStream, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
@@ -111,7 +111,13 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
   // For small files, directly read rather than memory map
   if (segment.length  minMemoryMapBytes) {
 val buf = ByteBuffer.allocate(segment.length.toInt)
-channel.read(buf, segment.offset)
+channel.position(segment.offset)
+while (buf.remaining() != 0) {
+  if (channel.read(buf) == -1) {
+throw new IOException(Reached EOF before filling buffer\n +
+  
soffset=${segment.offset}\nblockId=$blockId\nbuf.remaining=${buf.remaining})
+  }
+}
 buf.flip()
 Some(buf)
   } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/286f1efb/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala 
b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index 932b561..6dbad5f 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,6 +20,7 @@ 

git commit: [SPARK-4096][YARN]let ApplicationMaster accept executor memory argument in same format as JVM memory strings

2014-10-28 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 44d8b45a3 - 1ea3e3dc9


[SPARK-4096][YARN]let ApplicationMaster accept executor memory argument in same 
format as JVM memory strings

Here `ApplicationMaster` accept executor memory argument only in number format, 
we should let it accept JVM style memory strings as well.

Author: WangTaoTheTonic barneystin...@aliyun.com

Closes #2955 from WangTaoTheTonic/modifyDesc and squashes the following commits:

ab98c70 [WangTaoTheTonic] append parameter passed in
3779767 [WangTaoTheTonic] Update executor memory description in the help message


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ea3e3dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ea3e3dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ea3e3dc

Branch: refs/heads/master
Commit: 1ea3e3dc9dd942402731751089bab2fb6ae29c7b
Parents: 44d8b45
Author: WangTaoTheTonic barneystin...@aliyun.com
Authored: Tue Oct 28 12:31:42 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Tue Oct 28 12:31:42 2014 -0700

--
 .../apache/spark/deploy/yarn/ApplicationMasterArguments.scala| 4 ++--
 .../src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1ea3e3dc/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
--
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 3e6b96f..5c54e34 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.yarn
 
-import org.apache.spark.util.IntParam
+import org.apache.spark.util.{MemoryParam, IntParam}
 import collection.mutable.ArrayBuffer
 
 class ApplicationMasterArguments(val args: Array[String]) {
@@ -55,7 +55,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   numExecutors = value
   args = tail
 
-case (--worker-memory | --executor-memory) :: IntParam(value) :: 
tail =
+case (--worker-memory | --executor-memory) :: MemoryParam(value) 
:: tail =
   executorMemory = value
   args = tail
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1ea3e3dc/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
--
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 0417cdd..8ea0e7c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -379,7 +379,7 @@ private[spark] trait ClientBase extends Logging {
 val amArgs =
   Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
   Seq(
---executor-memory, args.executorMemory.toString,
+--executor-memory, args.executorMemory.toString + m,
 --executor-cores, args.executorCores.toString,
 --num-executors , args.numExecutors.toString)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-4065] Add check for IPython on Windows

2014-10-28 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 286f1efb0 - f0c571760


[SPARK-4065] Add check for IPython on Windows

This issue employs logic similar to the bash launcher (pyspark) to check
if IPTYHON=1, and if so launch ipython with options in IPYTHON_OPTS.
This fix assumes that ipython is available in the system Path, and can
be invoked with a plain ipython command.

Author: Michael Griffiths msjgriffi...@gmail.com

Closes #2910 from msjgriffiths/pyspark-windows and squashes the following 
commits:

ef34678 [Michael Griffiths] Change build message to comply with [SPARK-3775]
361e3d8 [Michael Griffiths] [SPARK-4065] Add check for IPython on Windows
9ce72d1 [Michael Griffiths] [SPARK-4065] Add check for IPython on Windows

(cherry picked from commit 2f254dacf4b7ab9c59c7cef59fd364ca682162ae)
Signed-off-by: Andrew Or and...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0c57176
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0c57176
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0c57176

Branch: refs/heads/branch-1.1
Commit: f0c571760040c998d83ad87d08e104b38bfc19f7
Parents: 286f1ef
Author: Michael Griffiths msjgriffi...@gmail.com
Authored: Tue Oct 28 12:47:21 2014 -0700
Committer: Andrew Or and...@databricks.com
Committed: Tue Oct 28 12:47:33 2014 -0700

--
 bin/pyspark2.cmd | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f0c57176/bin/pyspark2.cmd
--
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index a0e66ab..59415e9 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -59,7 +59,12 @@ for /f %%i in ('echo %1^| findstr /R \.py') do (
 )
 
 if [%PYTHON_FILE%] == [] (
-  %PYSPARK_PYTHON%
+  set PYSPARK_SHELL=1
+  if [%IPYTHON%] == [1] (
+   ipython %IPYTHON_OPTS%
+  ) else (
+   %PYSPARK_PYTHON%
+  ) 
 ) else (
   echo.
   echo WARNING: Running python applications through ./bin/pyspark.cmd is 
deprecated as of Spark 1.0.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3814][SQL] Support for Bitwise AND(), OR(|) , XOR(^), NOT(~) in Spark HQL and SQL

2014-10-28 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 6c1b981c3 - 5807cb40a


[SPARK-3814][SQL] Support for Bitwise AND(), OR(|) ,XOR(^), NOT(~) in Spark 
HQL and SQL

Currently there is no support of Bitwise  , | in Spark HiveQl and Spark SQL as 
well. So this PR support the same.
I am closing https://github.com/apache/spark/pull/2926 as it has conflicts to 
merge. And also added support for Bitwise AND(), OR(|) ,XOR(^), NOT(~) And I 
handled all review comments in that PR

Author: ravipesala ravindra.pes...@huawei.com

Closes #2961 from ravipesala/SPARK-3814-NEW4 and squashes the following commits:

a391c7a [ravipesala] Rebase with master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5807cb40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5807cb40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5807cb40

Branch: refs/heads/master
Commit: 5807cb40ae178f0395c71b967f02aee853ef8bc9
Parents: 6c1b981
Author: ravipesala ravindra.pes...@huawei.com
Authored: Tue Oct 28 13:36:06 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Tue Oct 28 13:36:06 2014 -0700

--
 .../spark/sql/catalyst/SparkSQLParser.scala |  2 +-
 .../apache/spark/sql/catalyst/SqlParser.scala   |  4 +
 .../apache/spark/sql/catalyst/dsl/package.scala |  4 +
 .../sql/catalyst/expressions/arithmetic.scala   | 89 
 .../expressions/ExpressionEvaluationSuite.scala | 32 +++
 .../org/apache/spark/sql/SQLQuerySuite.scala| 16 
 .../org/apache/spark/sql/hive/HiveQl.scala  |  4 +
 .../sql/hive/execution/SQLQuerySuite.scala  | 24 ++
 8 files changed, 174 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5807cb40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
index 219322c..12e8346 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
@@ -61,7 +61,7 @@ class SqlLexical(val keywords: Seq[String]) extends 
StdLexical {
 
   delimiters += (
 @, *, +, -, , =, , !=, =, =, , /, (, ),
-,, ;, %, {, }, :, [, ], .
+,, ;, %, {, }, :, [, ], ., , |, ^, ~
   )
 
   override lazy val token: Parser[Token] =

http://git-wip-us.apache.org/repos/asf/spark/blob/5807cb40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 4e96771..0acf725 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -260,6 +260,9 @@ class SqlParser extends AbstractSparkSQLParser {
   ( * ^^^ { (e1: Expression, e2: Expression) = Multiply(e1, e2) }
   | / ^^^ { (e1: Expression, e2: Expression) = Divide(e1, e2) }
   | % ^^^ { (e1: Expression, e2: Expression) = Remainder(e1, e2) }
+  |  ^^^ { (e1: Expression, e2: Expression) = BitwiseAnd(e1, e2) }
+  | | ^^^ { (e1: Expression, e2: Expression) = BitwiseOr(e1, e2) }
+  | ^ ^^^ { (e1: Expression, e2: Expression) = BitwiseXor(e1, e2) }
   )
 
   protected lazy val function: Parser[Expression] =
@@ -370,6 +373,7 @@ class SqlParser extends AbstractSparkSQLParser {
 | dotExpressionHeader
 | ident ^^ UnresolvedAttribute
 | signedPrimary
+| ~ ~ expression ^^ BitwiseNot
 )
 
   protected lazy val dotExpressionHeader: Parser[Expression] =

http://git-wip-us.apache.org/repos/asf/spark/blob/5807cb40/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 75b6e37..23cfd48 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -62,12 +62,16 @@ package object dsl {
 
 def unary_- = UnaryMinus(expr)
 def unary_! = Not(expr)
+def unary_~ = BitwiseNot(expr)
 
 def + (other: Expression) = Add(expr, other)
 def - (other: Expression) = Subtract(expr, other)
 def * (other: Expression) = Multiply(expr, other)
 def / 

git commit: [SPARK-3988][SQL] add public API for date type

2014-10-28 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 5807cb40a - 47a40f60d


[SPARK-3988][SQL] add public API for date type

Add json and python api for date type.
By using Pickle, `java.sql.Date` was serialized as calendar, and recognized in 
python as `datetime.datetime`.

Author: Daoyuan Wang daoyuan.w...@intel.com

Closes #2901 from adrian-wang/spark3988 and squashes the following commits:

c51a24d [Daoyuan Wang] convert datetime to date
5670626 [Daoyuan Wang] minor line combine
f760d8e [Daoyuan Wang] fix indent
444f100 [Daoyuan Wang] fix a typo
1d74448 [Daoyuan Wang] fix scala style
8d7dd22 [Daoyuan Wang] add json and python api for date type


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47a40f60
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47a40f60
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47a40f60

Branch: refs/heads/master
Commit: 47a40f60d62ea69b659959994918d4c640f39d5b
Parents: 5807cb40
Author: Daoyuan Wang daoyuan.w...@intel.com
Authored: Tue Oct 28 13:43:25 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Tue Oct 28 13:43:25 2014 -0700

--
 python/pyspark/sql.py   | 57 +---
 .../spark/sql/catalyst/ScalaReflection.scala|  1 +
 .../spark/sql/catalyst/types/dataTypes.scala|  4 +-
 .../sql/catalyst/ScalaReflectionSuite.scala |  9 +++-
 .../scala/org/apache/spark/sql/SQLContext.scala | 10 ++--
 .../org/apache/spark/sql/json/JsonRDD.scala | 20 ---
 .../apache/spark/sql/api/java/JavaRowSuite.java | 11 ++--
 .../java/JavaSideDataTypeConversionSuite.java   |  1 +
 .../java/ScalaSideDataTypeConversionSuite.scala |  1 +
 .../org/apache/spark/sql/json/JsonSuite.scala   |  9 ++--
 10 files changed, 87 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/47a40f60/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 7daf306..93fd9d4 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -49,7 +49,7 @@ from pyspark.traceback_utils import SCCallSiteSync
 
 
 __all__ = [
-StringType, BinaryType, BooleanType, TimestampType, DecimalType,
+StringType, BinaryType, BooleanType, DateType, TimestampType, 
DecimalType,
 DoubleType, FloatType, ByteType, IntegerType, LongType,
 ShortType, ArrayType, MapType, StructField, StructType,
 SQLContext, HiveContext, SchemaRDD, Row]
@@ -132,6 +132,14 @@ class BooleanType(PrimitiveType):
 
 
 
+class DateType(PrimitiveType):
+
+Spark SQL DateType
+
+The data type representing datetime.date values.
+
+
+
 class TimestampType(PrimitiveType):
 
 Spark SQL TimestampType
@@ -438,7 +446,7 @@ def _parse_datatype_json_value(json_value):
 return _all_complex_types[json_value[type]].fromJson(json_value)
 
 
-# Mapping Python types to Spark SQL DateType
+# Mapping Python types to Spark SQL DataType
 _type_mappings = {
 bool: BooleanType,
 int: IntegerType,
@@ -448,8 +456,8 @@ _type_mappings = {
 unicode: StringType,
 bytearray: BinaryType,
 decimal.Decimal: DecimalType,
+datetime.date: DateType,
 datetime.datetime: TimestampType,
-datetime.date: TimestampType,
 datetime.time: TimestampType,
 }
 
@@ -656,10 +664,10 @@ def _infer_schema_type(obj, dataType):
 
 Fill the dataType with types infered from obj
 
- schema = _parse_schema_abstract(a b c)
- row = (1, 1.0, str)
+ schema = _parse_schema_abstract(a b c d)
+ row = (1, 1.0, str, datetime.date(2014, 10, 10))
  _infer_schema_type(row, schema)
-StructType...IntegerType...DoubleType...StringType...
+StructType...IntegerType...DoubleType...StringType...DateType...
  row = [[1], {key: (1, 2.0)}]
  schema = _parse_schema_abstract(a[] b{c d})
  _infer_schema_type(row, schema)
@@ -703,6 +711,7 @@ _acceptable_types = {
 DecimalType: (decimal.Decimal,),
 StringType: (str, unicode),
 BinaryType: (bytearray,),
+DateType: (datetime.date,),
 TimestampType: (datetime.datetime,),
 ArrayType: (list, tuple, array),
 MapType: (dict,),
@@ -740,7 +749,7 @@ def _verify_type(obj, dataType):
 
 # subclass of them can not be deserialized in JVM
 if type(obj) not in _acceptable_types[_type]:
-raise TypeError(%s can not accept abject in type %s
+raise TypeError(%s can not accept object in type %s
 % (dataType, type(obj)))
 
 if isinstance(dataType, ArrayType):
@@ -767,7 +776,7 @@ def _restore_object(dataType, obj):
  Restore object during unpickling. 
 # use id(dataType) as key to speed up lookup in dict
 # Because of batched pickling, dataType will be the
-# same object in mose 

git commit: [Spark 3922] Refactor spark-core to use Utils.UTF_8

2014-10-28 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 47a40f60d - abcafcfba


[Spark 3922] Refactor spark-core to use Utils.UTF_8

A global UTF8 constant is very helpful to handle encoding problems when 
converting between String and bytes. There are several solutions here:

1. Add `val UTF_8 = Charset.forName(UTF-8)` to Utils.scala
2. java.nio.charset.StandardCharsets.UTF_8 (require JDK7)
3. io.netty.util.CharsetUtil.UTF_8
4. com.google.common.base.Charsets.UTF_8
5. org.apache.commons.lang.CharEncoding.UTF_8
6. org.apache.commons.lang3.CharEncoding.UTF_8

IMO, I prefer option 1) because people can find it easily.

This is a PR for option 1) and only fixes Spark Core.

Author: zsxwing zsxw...@gmail.com

Closes #2781 from zsxwing/SPARK-3922 and squashes the following commits:

f974edd [zsxwing] Merge branch 'master' into SPARK-3922
2d27423 [zsxwing] Refactor spark-core to use Refactor spark-core to use 
Utils.UTF_8


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abcafcfb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abcafcfb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abcafcfb

Branch: refs/heads/master
Commit: abcafcfba38d7c8dba68a5510475c5c49ae54d92
Parents: 47a40f6
Author: zsxwing zsxw...@gmail.com
Authored: Tue Oct 28 14:26:57 2014 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Tue Oct 28 14:26:57 2014 -0700

--
 .../main/scala/org/apache/spark/SparkSaslClient.scala   |  7 ---
 .../main/scala/org/apache/spark/SparkSaslServer.scala   | 10 ++
 .../scala/org/apache/spark/api/python/PythonRDD.scala   |  9 -
 .../api/python/WriteInputFormatTestDataGenerator.scala  |  5 +++--
 .../org/apache/spark/deploy/worker/DriverRunner.scala   |  4 ++--
 .../org/apache/spark/deploy/worker/ExecutorRunner.scala |  4 ++--
 .../network/netty/client/BlockFetchingClient.scala  |  4 ++--
 .../netty/client/BlockFetchingClientHandler.scala   |  5 +++--
 .../apache/spark/network/netty/server/BlockServer.scala |  4 ++--
 .../netty/server/BlockServerChannelInitializer.scala|  6 +++---
 .../apache/spark/network/nio/ConnectionManager.scala|  4 +++-
 .../scala/org/apache/spark/network/nio/Message.scala|  4 +++-
 .../netty/client/BlockFetchingClientHandlerSuite.scala  |  3 ++-
 .../network/netty/server/BlockHeaderEncoderSuite.scala  |  8 
 .../scala/org/apache/spark/util/FileAppenderSuite.scala | 12 ++--
 .../test/scala/org/apache/spark/util/UtilsSuite.scala   | 12 ++--
 16 files changed, 55 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala 
b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
index 65003b6..a954fcc 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark
 
-import java.io.IOException
 import javax.security.auth.callback.Callback
 import javax.security.auth.callback.CallbackHandler
 import javax.security.auth.callback.NameCallback
@@ -31,6 +30,8 @@ import javax.security.sasl.SaslException
 
 import scala.collection.JavaConversions.mapAsJavaMap
 
+import com.google.common.base.Charsets.UTF_8
+
 /**
  * Implements SASL Client logic for Spark
  */
@@ -111,10 +112,10 @@ private[spark] class SparkSaslClient(securityMgr: 
SecurityManager)  extends Logg
 CallbackHandler {
 
 private val userName: String =
-  
SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(utf-8))
+  
SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes(UTF_8))
 private val secretKey = securityMgr.getSecretKey()
 private val userPassword: Array[Char] = SparkSaslServer.encodePassword(
-if (secretKey != null) secretKey.getBytes(utf-8) else 
.getBytes(utf-8))
+if (secretKey != null) secretKey.getBytes(UTF_8) else 
.getBytes(UTF_8))
 
 /**
  * Implementation used to respond to SASL request from the server.

http://git-wip-us.apache.org/repos/asf/spark/blob/abcafcfb/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala 
b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
index f6b0a91..7c2afb3 100644
--- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
+++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala
@@ -28,6 +28,8 @@ import javax.security.sasl.Sasl
 import javax.security.sasl.SaslException
 import 

git commit: [SPARK-3343] [SQL] Add serde support for CTAS

2014-10-28 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master abcafcfba - 4b55482ab


[SPARK-3343] [SQL] Add serde support for CTAS

Currently, `CTAS` (Create Table As Select) doesn't support specifying the 
`SerDe` in HQL. This PR will pass down the `ASTNode` into the physical operator 
`execution.CreateTableAsSelect`, which will extract the `CreateTableDesc` 
object via Hive `SemanticAnalyzer`. In the meantime, I also update the 
`HiveMetastoreCatalog.createTable` to optionally support the `CreateTableDesc` 
for table creation.

Author: Cheng Hao hao.ch...@intel.com

Closes #2570 from chenghao-intel/ctas_serde and squashes the following commits:

e011ef5 [Cheng Hao] shim for both 0.12  0.13.1
cfb3662 [Cheng Hao] revert to hive 0.12
c8a547d [Cheng Hao] Support SerDe properties within CTAS


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b55482a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b55482a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b55482a

Branch: refs/heads/master
Commit: 4b55482abf899c27da3d55401ad26b4e9247b327
Parents: abcafcf
Author: Cheng Hao hao.ch...@intel.com
Authored: Tue Oct 28 14:36:06 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Tue Oct 28 14:36:06 2014 -0700

--
 .../catalyst/plans/logical/basicOperators.scala |   8 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala|   4 +-
 .../scala/org/apache/spark/sql/QueryTest.scala  |  19 ++
 .../hive/execution/HiveCompatibilitySuite.scala |   6 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 197 ---
 .../org/apache/spark/sql/hive/HiveQl.scala  |  15 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  17 +-
 .../hive/execution/CreateTableAsSelect.scala|  39 ++--
 .../scala/org/apache/spark/sql/QueryTest.scala  |  19 ++
 .../sql/hive/execution/HiveExplainSuite.scala   |  37 ++--
 .../sql/hive/execution/SQLQuerySuite.scala  |  59 ++
 .../org/apache/spark/sql/hive/Shim12.scala  |   5 +-
 .../org/apache/spark/sql/hive/Shim13.scala  |   6 +-
 13 files changed, 337 insertions(+), 94 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 14b03c7..00bdf10 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -114,11 +114,13 @@ case class InsertIntoTable(
   }
 }
 
-case class CreateTableAsSelect(
+case class CreateTableAsSelect[T](
 databaseName: Option[String],
 tableName: String,
-child: LogicalPlan) extends UnaryNode {
-  override def output = child.output
+child: LogicalPlan,
+allowExisting: Boolean,
+desc: Option[T] = None) extends UnaryNode {
+  override def output = Seq.empty[Attribute]
   override lazy val resolved = (databaseName != None  childrenResolved)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 25ba7d8..15516af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
   @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan 
match {
 // For various commands (like DDL) and queries with side effects, we force 
query optimization to
 // happen right away to let these side effects take place eagerly.
-case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: 
WriteToFile =
+case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: 
WriteToFile =
   LogicalRDD(queryExecution.analyzed.output, 
queryExecution.toRdd)(sqlContext)
 case _ =
   baseLogicalPlan
@@ -123,7 +123,7 @@ private[sql] trait SchemaRDDLike {
*/
   @Experimental
   def saveAsTable(tableName: String): Unit =
-sqlContext.executePlan(CreateTableAsSelect(None, tableName, 
logicalPlan)).toRdd
+sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, 
false)).toRdd
 
   /** Returns the schema as a string in the tree format.
*


[1/2] [SPARK-4084] Reuse sort key in Sorter

2014-10-28 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master 4b55482ab - 84e5da87e


http://git-wip-us.apache.org/repos/asf/spark/blob/84e5da87/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala 
b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
index 6fe1079..066d47c 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util.collection
 
-import java.lang.{Float = JFloat}
+import java.lang.{Float = JFloat, Integer = JInteger}
 import java.util.{Arrays, Comparator}
 
 import org.scalatest.FunSuite
@@ -30,11 +30,15 @@ class SorterSuite extends FunSuite {
 val rand = new XORShiftRandom(123)
 val data0 = Array.tabulate[Int](1) { i = rand.nextInt() }
 val data1 = data0.clone()
+val data2 = data0.clone()
 
 Arrays.sort(data0)
 new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, 
Ordering.Int)
+new Sorter(new KeyReuseIntArraySortDataFormat)
+  .sort(data2, 0, data2.length, Ordering[IntWrapper])
 
-data0.zip(data1).foreach { case (x, y) = assert(x === y) }
+assert(data0.view === data1.view)
+assert(data0.view === data2.view)
   }
 
   test(KVArraySorter) {
@@ -61,10 +65,33 @@ class SorterSuite extends FunSuite {
 }
   }
 
+  /** Runs an experiment several times. */
+  def runExperiment(name: String, skip: Boolean = false)(f: = Unit, prepare: 
() = Unit): Unit = {
+if (skip) {
+  println(sSkipped experiment $name.)
+  return
+}
+
+val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare))
+System.gc()
+
+var i = 0
+var next10: Long = 0
+while (i  10) {
+  val time = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare))
+  next10 += time
+  println(s$name: Took $time ms)
+  i += 1
+}
+
+println(s$name: ($firstTry ms first try, ${next10 / 10} ms average))
+  }
+
   /**
* This provides a simple benchmark for comparing the Sorter with Java 
internal sorting.
* Ideally these would be executed one at a time, each in their own JVM, so 
their listing
-   * here is mainly to have the code.
+   * here is mainly to have the code. Running multiple tests within the same 
JVM session would
+   * prevent JIT inlining overridden methods and hence hurt the performance.
*
* The goal of this code is to sort an array of key-value pairs, where the 
array physically
* has the keys and values alternating. The basic Java sorts work only on 
the keys, so the
@@ -72,96 +99,167 @@ class SorterSuite extends FunSuite {
* those, while the Sorter approach can work directly on the input data 
format.
*
* Note that the Java implementation varies tremendously between Java 6 and 
Java 7, when
-   * the Java sort changed from merge sort to Timsort.
+   * the Java sort changed from merge sort to TimSort.
*/
-  ignore(Sorter benchmark) {
-
-/** Runs an experiment several times. */
-def runExperiment(name: String)(f: = Unit): Unit = {
-  val firstTry = org.apache.spark.util.Utils.timeIt(1)(f)
-  System.gc()
-
-  var i = 0
-  var next10: Long = 0
-  while (i  10) {
-val time = org.apache.spark.util.Utils.timeIt(1)(f)
-next10 += time
-println(s$name: Took $time ms)
-i += 1
-  }
-
-  println(s$name: ($firstTry ms first try, ${next10 / 10} ms average))
-}
-
+  ignore(Sorter benchmark for key-value pairs) {
 val numElements = 2500 // 25 mil
 val rand = new XORShiftRandom(123)
 
-val keys = Array.tabulate[JFloat](numElements) { i =
-  new JFloat(rand.nextFloat())
+// Test our key-value pairs where each element is a Tuple2[Float, Integer].
+
+val kvTuples = Array.tabulate(numElements) { i =
+  (new JFloat(rand.nextFloat()), new JInteger(i))
 }
 
-// Test our key-value pairs where each element is a Tuple2[Float, Integer)
-val kvTupleArray = Array.tabulate[AnyRef](numElements) { i =
-  (keys(i / 2): Float, i / 2: Int)
+val kvTupleArray = new Array[AnyRef](numElements)
+val prepareKvTupleArray = () = {
+  System.arraycopy(kvTuples, 0, kvTupleArray, 0, numElements)
 }
-runExperiment(Tuple-sort using Arrays.sort()) {
+runExperiment(Tuple-sort using Arrays.sort())({
   Arrays.sort(kvTupleArray, new Comparator[AnyRef] {
 override def compare(x: AnyRef, y: AnyRef): Int =
-  Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, 
y.asInstanceOf[(Float, _)]._1)
+  x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, 
_)]._1)
   })
-}
+}, prepareKvTupleArray)
 
 // Test our Sorter where each element alternates between Float and 
Integer, non-primitive
-

[2/2] git commit: [SPARK-4084] Reuse sort key in Sorter

2014-10-28 Thread adav
[SPARK-4084] Reuse sort key in Sorter

Sorter uses generic-typed key for sorting. When data is large, it creates lots 
of key objects, which is not efficient. We should reuse the key in Sorter for 
memory efficiency. This change is part of the petabyte sort implementation from 
rxin .

The `Sorter` class was written in Java and marked package private. So it is 
only available to `org.apache.spark.util.collection`. I renamed it to `TimSort` 
and add a simple wrapper of it, still called `Sorter`, in Scala, which is 
`private[spark]`.

The benchmark code is updated, which now resets the array before each run. Here 
is the result on sorting primitive Int arrays of size 25 million using Sorter:

~~~
[info] - Sorter benchmark for key-value pairs !!! IGNORED !!!
Java Arrays.sort() on non-primitive int array: Took 13237 ms
Java Arrays.sort() on non-primitive int array: Took 13320 ms
Java Arrays.sort() on non-primitive int array: Took 15718 ms
Java Arrays.sort() on non-primitive int array: Took 13283 ms
Java Arrays.sort() on non-primitive int array: Took 13267 ms
Java Arrays.sort() on non-primitive int array: Took 15122 ms
Java Arrays.sort() on non-primitive int array: Took 15495 ms
Java Arrays.sort() on non-primitive int array: Took 14877 ms
Java Arrays.sort() on non-primitive int array: Took 16429 ms
Java Arrays.sort() on non-primitive int array: Took 14250 ms
Java Arrays.sort() on non-primitive int array: (13878 ms first try, 14499 ms 
average)
Java Arrays.sort() on primitive int array: Took 2683 ms
Java Arrays.sort() on primitive int array: Took 2683 ms
Java Arrays.sort() on primitive int array: Took 2701 ms
Java Arrays.sort() on primitive int array: Took 2746 ms
Java Arrays.sort() on primitive int array: Took 2685 ms
Java Arrays.sort() on primitive int array: Took 2735 ms
Java Arrays.sort() on primitive int array: Took 2669 ms
Java Arrays.sort() on primitive int array: Took 2693 ms
Java Arrays.sort() on primitive int array: Took 2680 ms
Java Arrays.sort() on primitive int array: Took 2642 ms
Java Arrays.sort() on primitive int array: (2948 ms first try, 2691 ms average)
Sorter without key reuse on primitive int array: Took 10732 ms
Sorter without key reuse on primitive int array: Took 12482 ms
Sorter without key reuse on primitive int array: Took 10718 ms
Sorter without key reuse on primitive int array: Took 12650 ms
Sorter without key reuse on primitive int array: Took 10747 ms
Sorter without key reuse on primitive int array: Took 10783 ms
Sorter without key reuse on primitive int array: Took 12721 ms
Sorter without key reuse on primitive int array: Took 10604 ms
Sorter without key reuse on primitive int array: Took 10622 ms
Sorter without key reuse on primitive int array: Took 11843 ms
Sorter without key reuse on primitive int array: (11089 ms first try, 11390 ms 
average)
Sorter with key reuse on primitive int array: Took 5141 ms
Sorter with key reuse on primitive int array: Took 5298 ms
Sorter with key reuse on primitive int array: Took 5066 ms
Sorter with key reuse on primitive int array: Took 5164 ms
Sorter with key reuse on primitive int array: Took 5203 ms
Sorter with key reuse on primitive int array: Took 5274 ms
Sorter with key reuse on primitive int array: Took 5186 ms
Sorter with key reuse on primitive int array: Took 5159 ms
Sorter with key reuse on primitive int array: Took 5164 ms
Sorter with key reuse on primitive int array: Took 5078 ms
Sorter with key reuse on primitive int array: (5311 ms first try, 5173 ms 
average)
~~~

So with key reuse, it is faster and less likely to trigger GC.

Author: Xiangrui Meng m...@databricks.com
Author: Reynold Xin r...@apache.org

Closes #2937 from mengxr/SPARK-4084 and squashes the following commits:

d73c3d0 [Xiangrui Meng] address comments
0b7b682 [Xiangrui Meng] fix mima
a72f53c [Xiangrui Meng] update timeIt
38ba50c [Xiangrui Meng] update timeIt
720f731 [Xiangrui Meng] add doc about JIT specialization
78f2879 [Xiangrui Meng] update tests
7de2efd [Xiangrui Meng] update the Sorter benchmark code to be correct
8626356 [Xiangrui Meng] add prepare to timeIt and update testsin SorterSuite
5f0d530 [Xiangrui Meng] update method modifiers of SortDataFormat
6ffbe66 [Xiangrui Meng] rename Sorter to TimSort and add a Scala wrapper that 
is private[spark]
b00db4d [Xiangrui Meng] doc and tests
cf94e8a [Xiangrui Meng] renaming
464ddce [Reynold Xin] cherry-pick rxin's commit


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84e5da87
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84e5da87
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84e5da87

Branch: refs/heads/master
Commit: 84e5da87e32256ba4f3dee6f8bf532ce88322028
Parents: 4b55482
Author: Xiangrui Meng m...@databricks.com
Authored: Tue Oct 28 15:14:41 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Tue Oct 28 15:14:41 2014 -0700

--
 

git commit: [SPARK-4008] Fix kryo with fold in KryoSerializerSuite

2014-10-28 Thread adav
Repository: spark
Updated Branches:
  refs/heads/master 84e5da87e - 1536d7033


[SPARK-4008] Fix kryo with fold in KryoSerializerSuite

`zeroValue` will be serialized by `spark.closure.serializer` but 
`spark.closure.serializer` only supports the default Java serializer. So it 
must not be `ClassWithoutNoArgConstructor`, which can not be serialized by the 
Java serializer.

This PR changed `zeroValue` to null and updated the test to make it work 
correctly.

Author: zsxwing zsxw...@gmail.com

Closes #2856 from zsxwing/SPARK-4008 and squashes the following commits:

51da655 [zsxwing] [SPARK-4008] Fix kryo with fold in KryoSerializerSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1536d703
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1536d703
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1536d703

Branch: refs/heads/master
Commit: 1536d70331e9a4f5b5ea9dabfd72592ca1fc8e35
Parents: 84e5da8
Author: zsxwing zsxw...@gmail.com
Authored: Tue Oct 28 17:59:10 2014 -0700
Committer: Aaron Davidson aa...@databricks.com
Committed: Tue Oct 28 17:59:10 2014 -0700

--
 .../apache/spark/serializer/KryoSerializerSuite.scala  | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1536d703/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 64ac6d2..a70f67a 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -201,12 +201,17 @@ class KryoSerializerSuite extends FunSuite with 
SharedSparkContext {
 assert(control.sum === result)
   }
 
-  // TODO: this still doesn't work
-  ignore(kryo with fold) {
+  test(kryo with fold) {
 val control = 1 :: 2 :: Nil
+// zeroValue must not be a ClassWithoutNoArgConstructor instance because 
it will be
+// serialized by spark.closure.serializer but spark.closure.serializer 
only supports
+// the default Java serializer.
 val result = sc.parallelize(control, 2).map(new 
ClassWithoutNoArgConstructor(_))
-.fold(new ClassWithoutNoArgConstructor(10))((t1, t2) = new 
ClassWithoutNoArgConstructor(t1.x + t2.x)).x
-assert(10 + control.sum === result)
+  .fold(null)((t1, t2) = {
+  val t1x = if (t1 == null) 0 else t1.x
+  new ClassWithoutNoArgConstructor(t1x + t2.x)
+}).x
+assert(control.sum === result)
   }
 
   test(kryo with nonexistent custom registrator should fail) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



git commit: [SPARK-3904] [SQL] add constant objectinspector support for udfs

2014-10-28 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 1536d7033 - b5e79bf88


[SPARK-3904] [SQL] add constant objectinspector support for udfs

In HQL, we convert all of the data type into normal `ObjectInspector`s for 
UDFs, most of cases it works, however, some of the UDF actually requires its 
children `ObjectInspector` to be the `ConstantObjectInspector`, which will 
cause exception.
e.g.
select named_struct(x, str) from src limit 1;

I updated the method `wrap` by adding the one more parameter 
`ObjectInspector`(to describe what it expects to wrap to, for example: 
java.lang.Integer or IntWritable).

As well as the `unwrap` method by providing the input `ObjectInspector`.

Author: Cheng Hao hao.ch...@intel.com

Closes #2762 from chenghao-intel/udf_coi and squashes the following commits:

bcacfd7 [Cheng Hao] Shim for both Hive 0.12  0.13.1
2416e5d [Cheng Hao] revert to hive 0.12
5793c01 [Cheng Hao] add space before while
4e56e1b [Cheng Hao] style issue
683d3fd [Cheng Hao] Add golden files
fe591e4 [Cheng Hao] update HiveGenericUdf for set the ObjectInspector while 
constructing the DeferredObject
f6740fe [Cheng Hao] Support Constant ObjectInspector for Map  List
8814c3a [Cheng Hao] Passing ContantObjectInspector(when necessary) for UDF 
initializing


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b5e79bf8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b5e79bf8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b5e79bf8

Branch: refs/heads/master
Commit: b5e79bf889700159d490cdac1f6322dff424b1d9
Parents: 1536d70
Author: Cheng Hao hao.ch...@intel.com
Authored: Tue Oct 28 19:11:57 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Tue Oct 28 19:11:57 2014 -0700

--
 .../hive/execution/HiveCompatibilitySuite.scala |   8 +-
 .../apache/spark/sql/hive/HiveInspectors.scala  | 185 ---
 .../org/apache/spark/sql/hive/TableReader.scala |   2 +-
 .../org/apache/spark/sql/hive/hiveUdfs.scala|  62 ---
 ...tant array-0-761ef205b10ac4a10122c8b4ce10ada |   1 +
 ...med_struct-0-8f0ea83364b78634fbb3752c5a5c725 |   1 +
 ...ed_struct-1-380c9638cc6ea8ea42f187bf0cedf350 |   1 +
 ...ed_struct-2-22a79ac608b1249306f82f4bdc669b17 |   0
 ...ed_struct-3-d7e4a555934307155784904ff9df188b |   1 +
 ...ort_array-0-e86d559aeb84a4cc017a103182c22bfb |   0
 ...ort_array-1-976cd8b6b50a2748bbc768aa5e11cf82 |   1 +
 ...rt_array-10-9e047718e5fea6ea79124f1e899f1c13 |   1 +
 ...ort_array-2-c429ec85a6da60ebd4bc6f0f266e8b93 |   4 +
 ...ort_array-3-55c4cdaf8438b06675d60848d68f35de |   0
 ...df_struct-0-f41043b7d9f14fa5e998c90454c7bdb1 |   1 +
 ...df_struct-1-8ccdb20153debdab789ea8ad0228e2eb |   1 +
 ...df_struct-2-4a62774a6de7571c8d2bcb77da63f8f3 |   0
 ...df_struct-3-abffdaacb0c7076ab538fbeec072daa2 |   1 +
 .../sql/hive/execution/HiveQuerySuite.scala |   8 +
 .../org/apache/spark/sql/hive/Shim12.scala  |  57 ++
 .../org/apache/spark/sql/hive/Shim13.scala  |  64 ++-
 21 files changed, 307 insertions(+), 92 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b5e79bf8/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
--
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 26d9ca0..1a3c24b 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -233,7 +233,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest 
with BeforeAndAfter {
 
 // Sort with Limit clause causes failure.
 ctas,
-ctas_hadoop20
+ctas_hadoop20,
+
+// timestamp in array, the output format of Hive contains double quotes, 
while
+// Spark SQL doesn't
+udf_sort_array
   ) ++ HiveShim.compatibilityBlackList
 
   /**
@@ -861,6 +865,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
 udf_minute,
 udf_modulo,
 udf_month,
+udf_named_struct,
 udf_negative,
 udf_not,
 udf_notequal,
@@ -894,6 +899,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
 udf_stddev_pop,
 udf_stddev_samp,
 udf_string,
+udf_struct,
 udf_substring,
 udf_subtract,
 udf_sum,

http://git-wip-us.apache.org/repos/asf/spark/blob/b5e79bf8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
--
diff --git 

git commit: [SPARK-4133] [SQL] [PySpark] type conversionfor python udf

2014-10-28 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master b5e79bf88 - 8c0bfd08f


[SPARK-4133] [SQL] [PySpark] type conversionfor python udf

Call Python UDF on ArrayType/MapType/PrimitiveType, the returnType can also be 
ArrayType/MapType/PrimitiveType.

For StructType, it will act as tuple (without attributes). If returnType is 
StructType, it also should be tuple.

Author: Davies Liu dav...@databricks.com

Closes #2973 from davies/udf_array and squashes the following commits:

306956e [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
udf_array
2c00e43 [Davies Liu] fix merge
11395fa [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
udf_array
9df50a2 [Davies Liu] address comments
79afb4e [Davies Liu] type conversionfor python udf


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c0bfd08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c0bfd08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c0bfd08

Branch: refs/heads/master
Commit: 8c0bfd08fc19fa5de7d77bf8306d19834f907ec0
Parents: b5e79bf
Author: Davies Liu dav...@databricks.com
Authored: Tue Oct 28 19:38:16 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Tue Oct 28 19:38:16 2014 -0700

--
 python/pyspark/tests.py | 16 +++-
 .../scala/org/apache/spark/sql/SQLContext.scala | 43 +
 .../scala/org/apache/spark/sql/SchemaRDD.scala  | 42 ++---
 .../apache/spark/sql/execution/pythonUdfs.scala | 91 ++--
 4 files changed, 102 insertions(+), 90 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8c0bfd08/python/pyspark/tests.py
--
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 047d857..37a1289 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -49,7 +49,7 @@ from pyspark.files import SparkFiles
 from pyspark.serializers import read_int, BatchedSerializer, 
MarshalSerializer, PickleSerializer, \
 CloudPickleSerializer
 from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, 
ExternalSorter
-from pyspark.sql import SQLContext, IntegerType, Row
+from pyspark.sql import SQLContext, IntegerType, Row, ArrayType
 from pyspark import shuffle
 
 _have_scipy = False
@@ -690,10 +690,20 @@ class SQLTests(ReusedPySparkTestCase):
 self.assertEqual(row[0], 5)
 
 def test_udf2(self):
-self.sqlCtx.registerFunction(strlen, lambda string: len(string))
+self.sqlCtx.registerFunction(strlen, lambda string: len(string), 
IntegerType())
 
self.sqlCtx.inferSchema(self.sc.parallelize([Row(a=test)])).registerTempTable(test)
 [res] = self.sqlCtx.sql(SELECT strlen(a) FROM test WHERE strlen(a)  
1).collect()
-self.assertEqual(u4, res[0])
+self.assertEqual(4, res[0])
+
+def test_udf_with_array_type(self):
+d = [Row(l=range(3), d={key: range(5)})]
+rdd = self.sc.parallelize(d)
+srdd = self.sqlCtx.inferSchema(rdd).registerTempTable(test)
+self.sqlCtx.registerFunction(copylist, lambda l: list(l), 
ArrayType(IntegerType()))
+self.sqlCtx.registerFunction(maplen, lambda d: len(d), IntegerType())
+[(l1, l2)] = self.sqlCtx.sql(select copylist(l), maplen(d) from 
test).collect()
+self.assertEqual(range(3), l1)
+self.assertEqual(1, l2)
 
 def test_broadcast_in_udf(self):
 bar = {a: aa, b: bb, c: abc}

http://git-wip-us.apache.org/repos/asf/spark/blob/8c0bfd08/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ca8706e..a41a500 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -438,7 +438,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
   private[sql] def applySchemaToPythonRDD(
   rdd: RDD[Array[Any]],
   schema: StructType): SchemaRDD = {
-import scala.collection.JavaConversions._
 
 def needsConversion(dataType: DataType): Boolean = dataType match {
   case ByteType = true
@@ -452,49 +451,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
   case other = false
 }
 
-// Converts value to the type specified by the data type.
-// Because Python does not have data types for DateType, TimestampType, 
FloatType, ShortType,
-// and ByteType, we need to explicitly convert values in columns of these 
data types to the
-// desired JVM data types.
-def convert(obj: Any, dataType: DataType): Any = (obj,