spark git commit: Revert "Once driver register successfully, stop it to connect to master."
Repository: spark Updated Branches: refs/heads/master 5a514b61b -> cd3d937b0 Revert "Once driver register successfully, stop it to connect to master." This reverts commit 5a514b61bbfb609c505d8d65f2483068a56f1f70. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd3d937b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd3d937b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd3d937b Branch: refs/heads/master Commit: cd3d937b0cc89cb5e4098f7d9f5db2712e3de71e Parents: 5a514b6 Author: Davies LiuAuthored: Thu Dec 17 08:01:27 2015 -0800 Committer: Davies Liu Committed: Thu Dec 17 08:01:27 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d937b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 3cf7464..1e2f469 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -130,7 +130,6 @@ private[spark] class AppClient( if (registered.get) { registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() - registrationRetryTimer.cancel(true) } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12395] [SQL] fix resulting columns of outer join
Repository: spark Updated Branches: refs/heads/master cd3d937b0 -> a170d34a1 [SPARK-12395] [SQL] fix resulting columns of outer join For API DataFrame.join(right, usingColumns, joinType), if the joinType is right_outer or full_outer, the resulting join columns could be wrong (will be null). The order of columns had been changed to match that with MySQL and PostgreSQL [1]. This PR also fix the nullability of output for outer join. [1] http://www.postgresql.org/docs/9.2/static/queries-table-expressions.html Author: Davies LiuCloses #10353 from davies/fix_join. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a170d34a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a170d34a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a170d34a Branch: refs/heads/master Commit: a170d34a1b309fecc76d1370063e0c4f44dc2142 Parents: cd3d937 Author: Davies Liu Authored: Thu Dec 17 08:04:11 2015 -0800 Committer: Davies Liu Committed: Thu Dec 17 08:04:11 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 25 .../apache/spark/sql/DataFrameJoinSuite.scala | 20 2 files changed, 36 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a170d34a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6250e95..d741312 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} @@ -455,10 +455,8 @@ class DataFrame private[sql]( // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. val joined = sqlContext.executePlan( - Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join] + Join(logicalPlan, right.logicalPlan, JoinType(joinType), None)).analyzed.asInstanceOf[Join] -// Project only one of the join columns. -val joinedCols = usingColumns.map(col => withPlan(joined.right).resolve(col)) val condition = usingColumns.map { col => catalyst.expressions.EqualTo( withPlan(joined.left).resolve(col), @@ -467,9 +465,26 @@ class DataFrame private[sql]( catalyst.expressions.And(cond, eqTo) } +// Project only one of the join columns. +val joinedCols = JoinType(joinType) match { + case Inner | LeftOuter | LeftSemi => +usingColumns.map(col => withPlan(joined.left).resolve(col)) + case RightOuter => +usingColumns.map(col => withPlan(joined.right).resolve(col)) + case FullOuter => +usingColumns.map { col => + val leftCol = withPlan(joined.left).resolve(col) + val rightCol = withPlan(joined.right).resolve(col) + Alias(Coalesce(Seq(leftCol, rightCol)), col)() +} +} +// The nullability of output of joined could be different than original column, +// so we can only compare them by exprId +val joinRefs = condition.map(_.references.toSeq.map(_.exprId)).getOrElse(Nil) +val resultCols = joinedCols ++ joined.output.filterNot(e => joinRefs.contains(e.exprId)) withPlan { Project( -joined.output.filterNot(joinedCols.contains(_)), +resultCols, Join( joined.left, joined.right, http://git-wip-us.apache.org/repos/asf/spark/blob/a170d34a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index c70397f..39a6541 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++
spark git commit: [SPARK-12395] [SQL] fix resulting columns of outer join
Repository: spark Updated Branches: refs/heads/branch-1.6 a8466489a -> 1ebedb20f [SPARK-12395] [SQL] fix resulting columns of outer join For API DataFrame.join(right, usingColumns, joinType), if the joinType is right_outer or full_outer, the resulting join columns could be wrong (will be null). The order of columns had been changed to match that with MySQL and PostgreSQL [1]. This PR also fix the nullability of output for outer join. [1] http://www.postgresql.org/docs/9.2/static/queries-table-expressions.html Author: Davies LiuCloses #10353 from davies/fix_join. (cherry picked from commit a170d34a1b309fecc76d1370063e0c4f44dc2142) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ebedb20 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ebedb20 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ebedb20 Branch: refs/heads/branch-1.6 Commit: 1ebedb20f2c5b781eafa9bf2b5ab092d744cc4fd Parents: a846648 Author: Davies Liu Authored: Thu Dec 17 08:04:11 2015 -0800 Committer: Davies Liu Committed: Thu Dec 17 08:04:24 2015 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 25 .../apache/spark/sql/DataFrameJoinSuite.scala | 20 2 files changed, 36 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1ebedb20/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index cc8b70b..74f9370 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, Queryable, SQLExecution} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} @@ -499,10 +499,8 @@ class DataFrame private[sql]( // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // by creating a new instance for one of the branch. val joined = sqlContext.executePlan( - Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join] + Join(logicalPlan, right.logicalPlan, JoinType(joinType), None)).analyzed.asInstanceOf[Join] -// Project only one of the join columns. -val joinedCols = usingColumns.map(col => withPlan(joined.right).resolve(col)) val condition = usingColumns.map { col => catalyst.expressions.EqualTo( withPlan(joined.left).resolve(col), @@ -511,9 +509,26 @@ class DataFrame private[sql]( catalyst.expressions.And(cond, eqTo) } +// Project only one of the join columns. +val joinedCols = JoinType(joinType) match { + case Inner | LeftOuter | LeftSemi => +usingColumns.map(col => withPlan(joined.left).resolve(col)) + case RightOuter => +usingColumns.map(col => withPlan(joined.right).resolve(col)) + case FullOuter => +usingColumns.map { col => + val leftCol = withPlan(joined.left).resolve(col) + val rightCol = withPlan(joined.right).resolve(col) + Alias(Coalesce(Seq(leftCol, rightCol)), col)() +} +} +// The nullability of output of joined could be different than original column, +// so we can only compare them by exprId +val joinRefs = condition.map(_.references.toSeq.map(_.exprId)).getOrElse(Nil) +val resultCols = joinedCols ++ joined.output.filterNot(e => joinRefs.contains(e.exprId)) withPlan { Project( -joined.output.filterNot(joinedCols.contains(_)), +resultCols, Join( joined.left, joined.right, http://git-wip-us.apache.org/repos/asf/spark/blob/1ebedb20/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index
spark git commit: Revert "Once driver register successfully, stop it to connect to master."
Repository: spark Updated Branches: refs/heads/branch-1.6 da7542f24 -> a8466489a Revert "Once driver register successfully, stop it to connect to master." This reverts commit da7542f2408140a9a3b7ea245350976ac18676a5. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8466489 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8466489 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8466489 Branch: refs/heads/branch-1.6 Commit: a8466489ab01e59fe07ba20adfc3983ec6928157 Parents: da7542f Author: Davies LiuAuthored: Thu Dec 17 08:01:59 2015 -0800 Committer: Davies Liu Committed: Thu Dec 17 08:01:59 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a8466489/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 3cf7464..1e2f469 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -130,7 +130,6 @@ private[spark] class AppClient( if (registered.get) { registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() - registrationRetryTimer.cancel(true) } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Once driver register successfully, stop it to connect to master.
Repository: spark Updated Branches: refs/heads/branch-1.6 d509194b8 -> da7542f24 Once driver register successfully, stop it to connect to master. This commit is to resolve SPARK-12396. Author: echo2mei <534384...@qq.com> Closes #10354 from echoTomei/master. (cherry picked from commit 5a514b61bbfb609c505d8d65f2483068a56f1f70) Signed-off-by: Davies LiuProject: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da7542f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da7542f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da7542f2 Branch: refs/heads/branch-1.6 Commit: da7542f2408140a9a3b7ea245350976ac18676a5 Parents: d509194 Author: echo2mei <534384...@qq.com> Authored: Thu Dec 17 07:59:17 2015 -0800 Committer: Davies Liu Committed: Thu Dec 17 07:59:27 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/da7542f2/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 1e2f469..3cf7464 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -130,6 +130,7 @@ private[spark] class AppClient( if (registered.get) { registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() + registrationRetryTimer.cancel(true) } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Once driver register successfully, stop it to connect to master.
Repository: spark Updated Branches: refs/heads/master 9d66c4216 -> 5a514b61b Once driver register successfully, stop it to connect to master. This commit is to resolve SPARK-12396. Author: echo2mei <534384...@qq.com> Closes #10354 from echoTomei/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a514b61 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a514b61 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a514b61 Branch: refs/heads/master Commit: 5a514b61bbfb609c505d8d65f2483068a56f1f70 Parents: 9d66c42 Author: echo2mei <534384...@qq.com> Authored: Thu Dec 17 07:59:17 2015 -0800 Committer: Davies LiuCommitted: Thu Dec 17 07:59:17 2015 -0800 -- core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5a514b61/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 1e2f469..3cf7464 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -130,6 +130,7 @@ private[spark] class AppClient( if (registered.get) { registerMasterFutures.get.foreach(_.cancel(true)) registerMasterThreadPool.shutdownNow() + registrationRetryTimer.cancel(true) } else if (nthRetry >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server
Repository: spark Updated Branches: refs/heads/branch-1.6 1fbca4120 -> 881f2544e [SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server Fix problem with #10332, this one should fix Cluster mode on Mesos Author: Iulian DragosCloses #10359 from dragos/issue/fix-spark-12345-one-more-time. (cherry picked from commit 8184568810e8a2e7d5371db2c6a0366ef4841f70) Signed-off-by: Kousuke Saruta Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/881f2544 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/881f2544 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/881f2544 Branch: refs/heads/branch-1.6 Commit: 881f2544e13679c185a7c34ddb82e885aaa79813 Parents: 1fbca41 Author: Iulian Dragos Authored: Fri Dec 18 03:19:31 2015 +0900 Committer: Kousuke Saruta Committed: Fri Dec 18 03:37:43 2015 +0900 -- .../scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/881f2544/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 7c01ae4..196338f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -99,7 +99,7 @@ private[mesos] class MesosSubmitRequestServlet( // look for files in SPARK_HOME instead. We only need the ability to specify where to find // spark-submit script which user can user spark.executor.home or spark.home configurations // (SPARK-12345). -val environmentVariables = request.environmentVariables.filter(!_.equals("SPARK_HOME")) +val environmentVariables = request.environmentVariables.filterKeys(!_.equals("SPARK_HOME")) val name = request.sparkProperties.get("spark.app.name").getOrElse(mainClass) // Construct driver description - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12220][CORE] Make Utils.fetchFile support files that contain special characters
Repository: spark Updated Branches: refs/heads/master 6e0771665 -> 86e405f35 [SPARK-12220][CORE] Make Utils.fetchFile support files that contain special characters This PR encodes and decodes the file name to fix the issue. Author: Shixiong ZhuCloses #10208 from zsxwing/uri. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86e405f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86e405f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86e405f3 Branch: refs/heads/master Commit: 86e405f357711ae93935853a912bc13985c259db Parents: 6e07716 Author: Shixiong Zhu Authored: Thu Dec 17 09:55:37 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 09:55:37 2015 -0800 -- .../scala/org/apache/spark/HttpFileServer.scala | 6 ++--- .../spark/rpc/netty/NettyStreamManager.scala| 5 ++-- .../scala/org/apache/spark/util/Utils.scala | 26 +++- .../org/apache/spark/rpc/RpcEnvSuite.scala | 4 +++ .../org/apache/spark/util/UtilsSuite.scala | 11 + 5 files changed, 46 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/86e405f3/core/src/main/scala/org/apache/spark/HttpFileServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 77d8ec9..46f9f9e 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -63,12 +63,12 @@ private[spark] class HttpFileServer( def addFile(file: File) : String = { addFileToDir(file, fileDir) -serverUri + "/files/" + file.getName +serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName) } def addJar(file: File) : String = { addFileToDir(file, jarDir) -serverUri + "/jars/" + file.getName +serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName) } def addDirectory(path: String, resourceBase: String): String = { @@ -85,7 +85,7 @@ private[spark] class HttpFileServer( throw new IllegalArgumentException(s"$file cannot be a directory.") } Files.copy(file, new File(dir, file.getName)) -dir + "/" + file.getName +dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName) } } http://git-wip-us.apache.org/repos/asf/spark/blob/86e405f3/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index ecd9697..394cde4 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.server.StreamManager import org.apache.spark.rpc.RpcEnvFileServer +import org.apache.spark.util.Utils /** * StreamManager implementation for serving files from a NettyRpcEnv. @@ -64,13 +65,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) override def addFile(file: File): String = { require(files.putIfAbsent(file.getName(), file) == null, s"File ${file.getName()} already registered.") -s"${rpcEnv.address.toSparkURL}/files/${file.getName()}" + s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}" } override def addJar(file: File): String = { require(jars.putIfAbsent(file.getName(), file) == null, s"JAR ${file.getName()} already registered.") -s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}" + s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}" } override def addDirectory(baseUri: String, path: File): String = { http://git-wip-us.apache.org/repos/asf/spark/blob/86e405f3/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9dbe66e..fce89df 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -331,6 +331,30 @@ private[spark] object Utils extends Logging { } /** + * A file name may contain some invalid URI characters, such as " ". This method will convert
spark git commit: [SPARK-12220][CORE] Make Utils.fetchFile support files that contain special characters
Repository: spark Updated Branches: refs/heads/branch-1.6 41ad8aced -> 1fbca4120 [SPARK-12220][CORE] Make Utils.fetchFile support files that contain special characters This PR encodes and decodes the file name to fix the issue. Author: Shixiong ZhuCloses #10208 from zsxwing/uri. (cherry picked from commit 86e405f357711ae93935853a912bc13985c259db) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fbca412 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fbca412 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fbca412 Branch: refs/heads/branch-1.6 Commit: 1fbca41200d6e73cb276d5949b894881c700323f Parents: 41ad8ac Author: Shixiong Zhu Authored: Thu Dec 17 09:55:37 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 09:55:46 2015 -0800 -- .../scala/org/apache/spark/HttpFileServer.scala | 6 ++--- .../spark/rpc/netty/NettyStreamManager.scala| 5 ++-- .../scala/org/apache/spark/util/Utils.scala | 26 +++- .../org/apache/spark/rpc/RpcEnvSuite.scala | 4 +++ .../org/apache/spark/util/UtilsSuite.scala | 11 + 5 files changed, 46 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/HttpFileServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 7cf7bc0..ee9bfea 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -63,12 +63,12 @@ private[spark] class HttpFileServer( def addFile(file: File) : String = { addFileToDir(file, fileDir) -serverUri + "/files/" + file.getName +serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName) } def addJar(file: File) : String = { addFileToDir(file, jarDir) -serverUri + "/jars/" + file.getName +serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName) } def addFileToDir(file: File, dir: File) : String = { @@ -80,7 +80,7 @@ private[spark] class HttpFileServer( throw new IllegalArgumentException(s"$file cannot be a directory.") } Files.copy(file, new File(dir, file.getName)) -dir + "/" + file.getName +dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName) } } http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index a2768b4..5343482 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.server.StreamManager import org.apache.spark.rpc.RpcEnvFileServer +import org.apache.spark.util.Utils /** * StreamManager implementation for serving files from a NettyRpcEnv. @@ -51,13 +52,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) override def addFile(file: File): String = { require(files.putIfAbsent(file.getName(), file) == null, s"File ${file.getName()} already registered.") -s"${rpcEnv.address.toSparkURL}/files/${file.getName()}" + s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}" } override def addJar(file: File): String = { require(jars.putIfAbsent(file.getName(), file) == null, s"JAR ${file.getName()} already registered.") -s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}" + s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}" } } http://git-wip-us.apache.org/repos/asf/spark/blob/1fbca412/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index af63234..6cb52fb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -318,6 +318,30 @@ private[spark] object Utils extends Logging { } /** + * A file name may contain some invalid URI
spark git commit: [SQL] Update SQLContext.read.text doc
Repository: spark Updated Branches: refs/heads/master a170d34a1 -> 6e0771665 [SQL] Update SQLContext.read.text doc Since we rename the column name from ```text``` to ```value``` for DataFrame load by ```SQLContext.read.text```, we need to update doc. Author: Yanbo LiangCloses #10349 from yanboliang/text-value. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e077166 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e077166 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e077166 Branch: refs/heads/master Commit: 6e0771665b3c9330fc0a5b2c7740a796b4cd712e Parents: a170d34 Author: Yanbo Liang Authored: Thu Dec 17 09:19:46 2015 -0800 Committer: Reynold Xin Committed: Thu Dec 17 09:19:46 2015 -0800 -- python/pyspark/sql/readwriter.py | 2 +- sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../spark/sql/execution/datasources/text/DefaultSource.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e077166/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 2e75f0c..a3d7eca 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -207,7 +207,7 @@ class DataFrameReader(object): @ignore_unicode_prefix @since(1.6) def text(self, paths): -"""Loads a text file and returns a [[DataFrame]] with a single string column named "text". +"""Loads a text file and returns a [[DataFrame]] with a single string column named "value". Each line in the text file is a new row in the resulting DataFrame. http://git-wip-us.apache.org/repos/asf/spark/blob/6e077166/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 3ed1e55..c1a8f19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -339,7 +339,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { } /** - * Loads a text file and returns a [[DataFrame]] with a single string column named "text". + * Loads a text file and returns a [[DataFrame]] with a single string column named "value". * Each line in the text file is a new row in the resulting DataFrame. For example: * {{{ * // Scala: http://git-wip-us.apache.org/repos/asf/spark/blob/6e077166/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index fbd387b..4a1cbe4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -76,7 +76,7 @@ private[sql] class TextRelation( (@transient val sqlContext: SQLContext) extends HadoopFsRelation(maybePartitionSpec, parameters) { - /** Data schema is always a single column, named "text". */ + /** Data schema is always a single column, named "value". */ override def dataSchema: StructType = new StructType().add("value", StringType) /** This is an internal data source that outputs internal row format. */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] Update SQLContext.read.text doc
Repository: spark Updated Branches: refs/heads/branch-1.6 1ebedb20f -> 41ad8aced [SQL] Update SQLContext.read.text doc Since we rename the column name from ```text``` to ```value``` for DataFrame load by ```SQLContext.read.text```, we need to update doc. Author: Yanbo LiangCloses #10349 from yanboliang/text-value. (cherry picked from commit 6e0771665b3c9330fc0a5b2c7740a796b4cd712e) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41ad8ace Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41ad8ace Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41ad8ace Branch: refs/heads/branch-1.6 Commit: 41ad8aced2fc6c694c15e9465cfa34517b2395e8 Parents: 1ebedb2 Author: Yanbo Liang Authored: Thu Dec 17 09:19:46 2015 -0800 Committer: Reynold Xin Committed: Thu Dec 17 09:20:04 2015 -0800 -- python/pyspark/sql/readwriter.py | 2 +- sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- .../spark/sql/execution/datasources/text/DefaultSource.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/41ad8ace/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 2e75f0c..a3d7eca 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -207,7 +207,7 @@ class DataFrameReader(object): @ignore_unicode_prefix @since(1.6) def text(self, paths): -"""Loads a text file and returns a [[DataFrame]] with a single string column named "text". +"""Loads a text file and returns a [[DataFrame]] with a single string column named "value". Each line in the text file is a new row in the resulting DataFrame. http://git-wip-us.apache.org/repos/asf/spark/blob/41ad8ace/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 3ed1e55..c1a8f19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -339,7 +339,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { } /** - * Loads a text file and returns a [[DataFrame]] with a single string column named "text". + * Loads a text file and returns a [[DataFrame]] with a single string column named "value". * Each line in the text file is a new row in the resulting DataFrame. For example: * {{{ * // Scala: http://git-wip-us.apache.org/repos/asf/spark/blob/41ad8ace/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index fbd387b..4a1cbe4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -76,7 +76,7 @@ private[sql] class TextRelation( (@transient val sqlContext: SQLContext) extends HadoopFsRelation(maybePartitionSpec, parameters) { - /** Data schema is always a single column, named "text". */ + /** Data schema is always a single column, named "value". */ override def dataSchema: StructType = new StructType().add("value", StringType) /** This is an internal data source that outputs internal row format. */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server
Repository: spark Updated Branches: refs/heads/master 86e405f35 -> 818456881 [SPARK-12345][MESOS] Properly filter out SPARK_HOME in the Mesos REST server Fix problem with #10332, this one should fix Cluster mode on Mesos Author: Iulian DragosCloses #10359 from dragos/issue/fix-spark-12345-one-more-time. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81845688 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81845688 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81845688 Branch: refs/heads/master Commit: 8184568810e8a2e7d5371db2c6a0366ef4841f70 Parents: 86e405f Author: Iulian Dragos Authored: Fri Dec 18 03:19:31 2015 +0900 Committer: Kousuke Saruta Committed: Fri Dec 18 03:19:31 2015 +0900 -- .../scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81845688/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 24510db..c0b9359 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -99,7 +99,7 @@ private[mesos] class MesosSubmitRequestServlet( // cause spark-submit script to look for files in SPARK_HOME instead. // We only need the ability to specify where to find spark-submit script // which user can user spark.executor.home or spark.home configurations. -val environmentVariables = request.environmentVariables.filter(!_.equals("SPARK_HOME")) +val environmentVariables = request.environmentVariables.filterKeys(!_.equals("SPARK_HOME")) val name = request.sparkProperties.get("spark.app.name").getOrElse(mainClass) // Construct driver description - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12390] Clean up unused serializer parameter in BlockManager
Repository: spark Updated Branches: refs/heads/branch-1.6 881f2544e -> 88bbb5429 [SPARK-12390] Clean up unused serializer parameter in BlockManager No change in functionality is intended. This only changes internal API. Author: Andrew OrCloses #10343 from andrewor14/clean-bm-serializer. Conflicts: core/src/main/scala/org/apache/spark/storage/BlockManager.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88bbb542 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88bbb542 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88bbb542 Branch: refs/heads/branch-1.6 Commit: 88bbb5429dd3efcff6b2835a70143247b08ae6b2 Parents: 881f254 Author: Andrew Or Authored: Wed Dec 16 20:01:47 2015 -0800 Committer: Andrew Or Committed: Thu Dec 17 12:01:13 2015 -0800 -- .../org/apache/spark/storage/BlockManager.scala | 29 .../org/apache/spark/storage/DiskStore.scala| 10 --- 2 files changed, 11 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ab0007f..2cc2fd9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1190,20 +1190,16 @@ private[spark] class BlockManager( def dataSerializeStream( blockId: BlockId, outputStream: OutputStream, - values: Iterator[Any], - serializer: Serializer = defaultSerializer): Unit = { + values: Iterator[Any]): Unit = { val byteStream = new BufferedOutputStream(outputStream) -val ser = serializer.newInstance() +val ser = defaultSerializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } /** Serializes into a byte buffer. */ - def dataSerialize( - blockId: BlockId, - values: Iterator[Any], - serializer: Serializer = defaultSerializer): ByteBuffer = { + def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = { val byteStream = new ByteArrayOutputStream(4096) -dataSerializeStream(blockId, byteStream, values, serializer) +dataSerializeStream(blockId, byteStream, values) ByteBuffer.wrap(byteStream.toByteArray) } @@ -1211,24 +1207,21 @@ private[spark] class BlockManager( * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize( - blockId: BlockId, - bytes: ByteBuffer, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = { bytes.rewind() -dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer) +dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true)) } /** * Deserializes a InputStream into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserializeStream( - blockId: BlockId, - inputStream: InputStream, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = { val stream = new BufferedInputStream(inputStream) -serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator +defaultSerializer + .newInstance() + .deserializeStream(wrapForCompression(blockId, stream)) + .asIterator } def stop(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/DiskStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c008b9d..6c44771 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } - /** - * A version of getValues that allows a custom serializer. This is used as part of the - * shuffle short-circuit code. - */ - def getValues(blockId: BlockId, serializer: Serializer):
spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split
Repository: spark Updated Branches: refs/heads/branch-1.4 43f02e41e -> 28adc45d5 [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split String.split accepts a regular expression, so we should escape "." and "|". Author: Shixiong ZhuCloses #10361 from zsxwing/reg-bug. (cherry picked from commit 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28adc45d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28adc45d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28adc45d Branch: refs/heads/branch-1.4 Commit: 28adc45d5dfe2f979ed9e9a63cff108fcc885bce Parents: 43f02e4 Author: Shixiong Zhu Authored: Thu Dec 17 13:23:48 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 13:24:19 2015 -0800 -- .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +- .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/28adc45d/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala index 25f2111..446e3eb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala @@ -49,7 +49,7 @@ object MovieLensALS { def parseMovie(str: String): Movie = { val fields = str.split("::") assert(fields.size == 3) - Movie(fields(0).toInt, fields(1), fields(2).split("|")) + Movie(fields(0).toInt, fields(1), fields(2).split("\\|")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/28adc45d/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index fe6328b..68f8ea8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -231,7 +231,7 @@ private[streaming] object FileBasedWriteAheadLog { def getCallerName(): Option[String] = { val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName) - stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption) + stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption) } /** Convert a sequence of files to a sequence of sorted LogInfo objects */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split
Repository: spark Updated Branches: refs/heads/branch-1.5 0fdf5542b -> a8d14cc06 [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split String.split accepts a regular expression, so we should escape "." and "|". Author: Shixiong ZhuCloses #10361 from zsxwing/reg-bug. (cherry picked from commit 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8d14cc0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8d14cc0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8d14cc0 Branch: refs/heads/branch-1.5 Commit: a8d14cc0623232cc136102b0161e19fcf69a7323 Parents: 0fdf554 Author: Shixiong Zhu Authored: Thu Dec 17 13:23:48 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 13:24:08 2015 -0800 -- .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +- .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a8d14cc0/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala index 3ae53e5..02ed746 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala @@ -50,7 +50,7 @@ object MovieLensALS { def parseMovie(str: String): Movie = { val fields = str.split("::") assert(fields.size == 3) - Movie(fields(0).toInt, fields(1), fields(2).split("|")) + Movie(fields(0).toInt, fields(1), fields(2).split("\\|")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a8d14cc0/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index fe6328b..68f8ea8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -231,7 +231,7 @@ private[streaming] object FileBasedWriteAheadLog { def getCallerName(): Option[String] = { val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName) - stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption) + stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption) } /** Convert a sequence of files to a sequence of sorted LogInfo objects */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in assertOrderInvariantEquals method
Repository: spark Updated Branches: refs/heads/master e096a652b -> ed6ebda5c [SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in assertOrderInvariantEquals method org.apache.spark.streaming.Java8APISuite.java is failing due to trying to sort immutable list in assertOrderInvariantEquals method. Author: Evan ChenCloses #10336 from evanyc15/SPARK-12376-StreamingJavaAPISuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed6ebda5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed6ebda5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed6ebda5 Branch: refs/heads/master Commit: ed6ebda5c898bad76194fe3a090bef5a14f861c2 Parents: e096a65 Author: Evan Chen Authored: Thu Dec 17 14:22:30 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 14:22:30 2015 -0800 -- .../java/org/apache/spark/streaming/Java8APISuite.java | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ed6ebda5/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java -- diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 89e0c7f..e8a0dfc 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -439,9 +439,14 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ */ public static > void assertOrderInvariantEquals( List expected, List
actual) { -expected.forEach((List list) -> Collections.sort(list)); -actual.forEach((List list) -> Collections.sort(list)); -Assert.assertEquals(expected, actual); +expected.forEach(list -> Collections.sort(list)); +List
sortedActual = new ArrayList<>(); +actual.forEach(list -> { +List sortedList = new ArrayList<>(list); +Collections.sort(sortedList); +sortedActual.add(sortedList); +}); +Assert.assertEquals(expected, sortedActual); } @Test - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in assertOrderInvariantEquals method
Repository: spark Updated Branches: refs/heads/branch-1.6 48dcee484 -> 4df1dd403 [SPARK-12376][TESTS] Spark Streaming Java8APISuite fails in assertOrderInvariantEquals method org.apache.spark.streaming.Java8APISuite.java is failing due to trying to sort immutable list in assertOrderInvariantEquals method. Author: Evan ChenCloses #10336 from evanyc15/SPARK-12376-StreamingJavaAPISuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4df1dd40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4df1dd40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4df1dd40 Branch: refs/heads/branch-1.6 Commit: 4df1dd403441a4e4ca056d294385d8d0d8a0c65d Parents: 48dcee4 Author: Evan Chen Authored: Thu Dec 17 14:22:30 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 14:23:45 2015 -0800 -- .../java/org/apache/spark/streaming/Java8APISuite.java | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4df1dd40/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java -- diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 89e0c7f..e8a0dfc 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -439,9 +439,14 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ */ public static > void assertOrderInvariantEquals( List expected, List
actual) { -expected.forEach((List list) -> Collections.sort(list)); -actual.forEach((List list) -> Collections.sort(list)); -Assert.assertEquals(expected, actual); +expected.forEach(list -> Collections.sort(list)); +List
sortedActual = new ArrayList<>(); +actual.forEach(list -> { +List sortedList = new ArrayList<>(list); +Collections.sort(sortedList); +sortedActual.add(sortedList); +}); +Assert.assertEquals(expected, sortedActual); } @Test - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split
Repository: spark Updated Branches: refs/heads/master 818456881 -> 540b5aead [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split String.split accepts a regular expression, so we should escape "." and "|". Author: Shixiong ZhuCloses #10361 from zsxwing/reg-bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540b5aea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540b5aea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540b5aea Branch: refs/heads/master Commit: 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9 Parents: 8184568 Author: Shixiong Zhu Authored: Thu Dec 17 13:23:48 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 13:23:48 2015 -0800 -- .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +- .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/540b5aea/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala index 3ae53e5..02ed746 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala @@ -50,7 +50,7 @@ object MovieLensALS { def parseMovie(str: String): Movie = { val fields = str.split("::") assert(fields.size == 3) - Movie(fields(0).toInt, fields(1), fields(2).split("|")) + Movie(fields(0).toInt, fields(1), fields(2).split("\\|")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/540b5aea/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index a99b570..b946e0d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -253,7 +253,7 @@ private[streaming] object FileBasedWriteAheadLog { def getCallerName(): Option[String] = { val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName) - stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption) + stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption) } /** Convert a sequence of files to a sequence of sorted LogInfo objects */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split
Repository: spark Updated Branches: refs/heads/branch-1.6 88bbb5429 -> c0ab14fbe [SPARK-12410][STREAMING] Fix places that use '.' and '|' directly in split String.split accepts a regular expression, so we should escape "." and "|". Author: Shixiong ZhuCloses #10361 from zsxwing/reg-bug. (cherry picked from commit 540b5aeadc84d1a5d61bda4414abd6bf35dc7ff9) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0ab14fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0ab14fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0ab14fb Branch: refs/heads/branch-1.6 Commit: c0ab14fbeab2a81d174c3643a4fcc915ff2902e8 Parents: 88bbb54 Author: Shixiong Zhu Authored: Thu Dec 17 13:23:48 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 13:23:58 2015 -0800 -- .../src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala | 2 +- .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c0ab14fb/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala index 3ae53e5..02ed746 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala @@ -50,7 +50,7 @@ object MovieLensALS { def parseMovie(str: String): Movie = { val fields = str.split("::") assert(fields.size == 3) - Movie(fields(0).toInt, fields(1), fields(2).split("|")) + Movie(fields(0).toInt, fields(1), fields(2).split("\\|")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c0ab14fb/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index a99b570..b946e0d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -253,7 +253,7 @@ private[streaming] object FileBasedWriteAheadLog { def getCallerName(): Option[String] = { val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName) - stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption) + stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption) } /** Convert a sequence of files to a sequence of sorted LogInfo objects */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12397][SQL] Improve error messages for data sources when they are not found
Repository: spark Updated Branches: refs/heads/master 540b5aead -> e096a652b [SPARK-12397][SQL] Improve error messages for data sources when they are not found Point users to spark-packages.org to find them. Author: Reynold XinCloses #10351 from rxin/SPARK-12397. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e096a652 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e096a652 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e096a652 Branch: refs/heads/master Commit: e096a652b92fc64a7b3457cd0766ab324bcc980b Parents: 540b5ae Author: Reynold Xin Authored: Thu Dec 17 14:16:49 2015 -0800 Committer: Michael Armbrust Committed: Thu Dec 17 14:16:49 2015 -0800 -- .../datasources/ResolvedDataSource.scala| 50 +--- .../sql/sources/ResolvedDataSourceSuite.scala | 17 +++ 2 files changed, 49 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e096a652/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 86a306b..e02ee6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -57,24 +57,38 @@ object ResolvedDataSource extends Logging { val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match { - /** the provider format did not match any given registered aliases */ - case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { -case Success(dataSource) => dataSource -case Failure(error) => - if (provider.startsWith("org.apache.spark.sql.hive.orc")) { -throw new ClassNotFoundException( - "The ORC data source must be used with Hive support enabled.", error) - } else { -throw new ClassNotFoundException( - s"Failed to load class for data source: $provider.", error) - } - } - /** there is exactly one registered alias */ - case head :: Nil => head.getClass - /** There are multiple registered aliases for the input */ - case sources => sys.error(s"Multiple sources found for $provider, " + -s"(${sources.map(_.getClass.getName).mkString(", ")}), " + -"please specify the fully qualified class name.") + // the provider format did not match any given registered aliases + case Nil => + Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { + case Success(dataSource) => +// Found the data source using fully qualified path +dataSource + case Failure(error) => +if (provider.startsWith("org.apache.spark.sql.hive.orc")) { + throw new ClassNotFoundException( +"The ORC data source must be used with Hive support enabled.", error) +} else { + if (provider == "avro" || provider == "com.databricks.spark.avro") { +throw new ClassNotFoundException( + s"Failed to find data source: $provider. Please use Spark package " + + "http://spark-packages.org/package/databricks/spark-avro;, + error) + } else { +throw new ClassNotFoundException( + s"Failed to find data source: $provider. Please find packages at " + + "http://spark-packages.org;, + error) + } +} +} + case head :: Nil => +// there is exactly one registered alias +head.getClass + case sources => +// There are multiple registered aliases for the input +sys.error(s"Multiple sources found for $provider " + + s"(${sources.map(_.getClass.getName).mkString(", ")}), " + + "please specify the fully qualified class name.") } } http://git-wip-us.apache.org/repos/asf/spark/blob/e096a652/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
spark git commit: [SPARK-12397][SQL] Improve error messages for data sources when they are not found
Repository: spark Updated Branches: refs/heads/branch-1.6 c0ab14fbe -> 48dcee484 [SPARK-12397][SQL] Improve error messages for data sources when they are not found Point users to spark-packages.org to find them. Author: Reynold XinCloses #10351 from rxin/SPARK-12397. (cherry picked from commit e096a652b92fc64a7b3457cd0766ab324bcc980b) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48dcee48 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48dcee48 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48dcee48 Branch: refs/heads/branch-1.6 Commit: 48dcee48416d87bf9572ace0a82285bacfcbf46e Parents: c0ab14f Author: Reynold Xin Authored: Thu Dec 17 14:16:49 2015 -0800 Committer: Michael Armbrust Committed: Thu Dec 17 14:16:58 2015 -0800 -- .../datasources/ResolvedDataSource.scala| 50 +--- .../sql/sources/ResolvedDataSourceSuite.scala | 17 +++ 2 files changed, 49 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/48dcee48/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 86a306b..e02ee6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -57,24 +57,38 @@ object ResolvedDataSource extends Logging { val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match { - /** the provider format did not match any given registered aliases */ - case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { -case Success(dataSource) => dataSource -case Failure(error) => - if (provider.startsWith("org.apache.spark.sql.hive.orc")) { -throw new ClassNotFoundException( - "The ORC data source must be used with Hive support enabled.", error) - } else { -throw new ClassNotFoundException( - s"Failed to load class for data source: $provider.", error) - } - } - /** there is exactly one registered alias */ - case head :: Nil => head.getClass - /** There are multiple registered aliases for the input */ - case sources => sys.error(s"Multiple sources found for $provider, " + -s"(${sources.map(_.getClass.getName).mkString(", ")}), " + -"please specify the fully qualified class name.") + // the provider format did not match any given registered aliases + case Nil => + Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { + case Success(dataSource) => +// Found the data source using fully qualified path +dataSource + case Failure(error) => +if (provider.startsWith("org.apache.spark.sql.hive.orc")) { + throw new ClassNotFoundException( +"The ORC data source must be used with Hive support enabled.", error) +} else { + if (provider == "avro" || provider == "com.databricks.spark.avro") { +throw new ClassNotFoundException( + s"Failed to find data source: $provider. Please use Spark package " + + "http://spark-packages.org/package/databricks/spark-avro;, + error) + } else { +throw new ClassNotFoundException( + s"Failed to find data source: $provider. Please find packages at " + + "http://spark-packages.org;, + error) + } +} +} + case head :: Nil => +// there is exactly one registered alias +head.getClass + case sources => +// There are multiple registered aliases for the input +sys.error(s"Multiple sources found for $provider " + + s"(${sources.map(_.getClass.getName).mkString(", ")}), " + + "please specify the fully qualified class name.") } } http://git-wip-us.apache.org/repos/asf/spark/blob/48dcee48/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala -- diff --git
[2/2] spark git commit: [SPARK-8641][SQL] Native Spark Window functions
[SPARK-8641][SQL] Native Spark Window functions This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features. This has the following advantages: * Better memory management. * The ability to use spark UDAFs in Window functions. cc rxin / yhuai Author: Herman van HovellCloses #9819 from hvanhovell/SPARK-8641-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/658f66e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/658f66e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/658f66e6 Branch: refs/heads/master Commit: 658f66e6208a52367e3b43a6fee9c90f33fb6226 Parents: ed6ebda Author: Herman van Hovell Authored: Thu Dec 17 15:16:35 2015 -0800 Committer: Yin Huai Committed: Thu Dec 17 15:16:35 2015 -0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 81 ++- .../sql/catalyst/analysis/CheckAnalysis.scala | 39 +- .../catalyst/analysis/FunctionRegistry.scala| 12 +- .../expressions/aggregate/interfaces.scala | 7 +- .../expressions/windowExpressions.scala | 318 +++-- .../catalyst/analysis/AnalysisErrorSuite.scala | 31 +- .../org/apache/spark/sql/execution/Window.scala | 649 +++ .../spark/sql/expressions/WindowSpec.scala | 54 +- .../scala/org/apache/spark/sql/functions.scala | 16 +- .../apache/spark/sql/DataFrameWindowSuite.scala | 295 + .../HiveWindowFunctionQuerySuite.scala | 10 +- .../org/apache/spark/sql/hive/HiveContext.scala | 1 - .../org/apache/spark/sql/hive/HiveQl.scala | 22 +- .../org/apache/spark/sql/hive/hiveUDFs.scala| 224 --- .../sql/hive/HiveDataFrameWindowSuite.scala | 259 .../sql/hive/execution/WindowQuerySuite.scala | 230 +++ 16 files changed, 1325 insertions(+), 923 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ca00a5e..64dd83a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -77,6 +77,8 @@ class Analyzer( ResolveGenerate :: ResolveFunctions :: ResolveAliases :: + ResolveWindowOrder :: + ResolveWindowFrame :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: @@ -127,14 +129,12 @@ class Analyzer( // Lookup WindowSpecDefinitions. This rule works with unresolved children. case WithWindowDefinition(windowDefinitions, child) => child.transform { - case plan => plan.transformExpressions { + case p => p.transformExpressions { case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => val errorMessage = s"Window specification $windowName is not defined in the WINDOW clause." val windowSpecDefinition = -windowDefinitions - .get(windowName) - .getOrElse(failAnalysis(errorMessage)) +windowDefinitions.getOrElse(windowName, failAnalysis(errorMessage)) WindowExpression(c, windowSpecDefinition) } } @@ -577,6 +577,10 @@ class Analyzer( AggregateExpression(max, Complete, isDistinct = false) case min: Min if isDistinct => AggregateExpression(min, Complete, isDistinct = false) +// AggregateWindowFunctions are AggregateFunctions that can only be evaluated within +// the context of a Window clause. They do not need to be wrapped in an +// AggregateExpression. +case wf: AggregateWindowFunction => wf // We get an aggregate function, we need to wrap it in an AggregateExpression. case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct) // This function is not an aggregate function, just return the resolved one. @@ -597,11 +601,17 @@ class Analyzer( } def containsAggregates(exprs: Seq[Expression]): Boolean = { - exprs.foreach(_.foreach { -case agg: AggregateExpression => return true -case _ => - }) - false + // Collect all Windowed Aggregate Expressions. + val windowedAggExprs =
spark git commit: [SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when recovering from checkpoint data
Repository: spark Updated Branches: refs/heads/master 658f66e62 -> f4346f612 [SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when recovering from checkpoint data Add a transient flag `DStream.restoredFromCheckpointData` to control the restore processing in DStream to avoid duplicate works: check this flag first in `DStream.restoreCheckpointData`, only when `false`, the restore process will be executed. Author: jhu-changCloses #9765 from jhu-chang/SPARK-11749. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4346f61 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4346f61 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4346f61 Branch: refs/heads/master Commit: f4346f612b6798517153a786f9172cf41618d34d Parents: 658f66e Author: jhu-chang Authored: Thu Dec 17 17:53:15 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 17:53:15 2015 -0800 -- .../spark/streaming/dstream/DStream.scala | 15 -- .../spark/streaming/CheckpointSuite.scala | 56 ++-- 2 files changed, 62 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f4346f61/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 1a6edf9..91a43e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -97,6 +97,8 @@ abstract class DStream[T: ClassTag] ( private[streaming] val mustCheckpoint = false private[streaming] var checkpointDuration: Duration = null private[streaming] val checkpointData = new DStreamCheckpointData(this) + @transient + private var restoredFromCheckpointData = false // Reference to whole DStream graph private[streaming] var graph: DStreamGraph = null @@ -507,11 +509,14 @@ abstract class DStream[T: ClassTag] ( * override the updateCheckpointData() method would also need to override this method. */ private[streaming] def restoreCheckpointData() { -// Create RDDs from the checkpoint data -logInfo("Restoring checkpoint data") -checkpointData.restore() -dependencies.foreach(_.restoreCheckpointData()) -logInfo("Restored checkpoint data") +if (!restoredFromCheckpointData) { + // Create RDDs from the checkpoint data + logInfo("Restoring checkpoint data") + checkpointData.restore() + dependencies.foreach(_.restoreCheckpointData()) + restoredFromCheckpointData = true + logInfo("Restored checkpoint data") +} } @throws(classOf[IOException]) http://git-wip-us.apache.org/repos/asf/spark/blob/f4346f61/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index cd28d3c..f5f446f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -34,9 +34,30 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils} -import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler._ -import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils} +import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils} + +/** + * A input stream that records the times of restore() invoked + */ +private[streaming] +class CheckpointInputDStream(ssc_ : StreamingContext) extends InputDStream[Int](ssc_) { + protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData + override def start(): Unit = { } + override def stop(): Unit = { } + override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.makeRDD(Seq(1))) + private[streaming]
spark git commit: [SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when recovering from checkpoint data
Repository: spark Updated Branches: refs/heads/branch-1.6 4df1dd403 -> 9177ea383 [SPARK-11749][STREAMING] Duplicate creating the RDD in file stream when recovering from checkpoint data Add a transient flag `DStream.restoredFromCheckpointData` to control the restore processing in DStream to avoid duplicate works: check this flag first in `DStream.restoreCheckpointData`, only when `false`, the restore process will be executed. Author: jhu-changCloses #9765 from jhu-chang/SPARK-11749. (cherry picked from commit f4346f612b6798517153a786f9172cf41618d34d) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9177ea38 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9177ea38 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9177ea38 Branch: refs/heads/branch-1.6 Commit: 9177ea383a29653f0591a59e1ee2dff6b87d5a1c Parents: 4df1dd4 Author: jhu-chang Authored: Thu Dec 17 17:53:15 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 17:54:14 2015 -0800 -- .../spark/streaming/dstream/DStream.scala | 15 -- .../spark/streaming/CheckpointSuite.scala | 56 ++-- 2 files changed, 62 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9177ea38/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 1a6edf9..91a43e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -97,6 +97,8 @@ abstract class DStream[T: ClassTag] ( private[streaming] val mustCheckpoint = false private[streaming] var checkpointDuration: Duration = null private[streaming] val checkpointData = new DStreamCheckpointData(this) + @transient + private var restoredFromCheckpointData = false // Reference to whole DStream graph private[streaming] var graph: DStreamGraph = null @@ -507,11 +509,14 @@ abstract class DStream[T: ClassTag] ( * override the updateCheckpointData() method would also need to override this method. */ private[streaming] def restoreCheckpointData() { -// Create RDDs from the checkpoint data -logInfo("Restoring checkpoint data") -checkpointData.restore() -dependencies.foreach(_.restoreCheckpointData()) -logInfo("Restored checkpoint data") +if (!restoredFromCheckpointData) { + // Create RDDs from the checkpoint data + logInfo("Restoring checkpoint data") + checkpointData.restore() + dependencies.foreach(_.restoreCheckpointData()) + restoredFromCheckpointData = true + logInfo("Restored checkpoint data") +} } @throws(classOf[IOException]) http://git-wip-us.apache.org/repos/asf/spark/blob/9177ea38/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index cd28d3c..f5f446f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -34,9 +34,30 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils} -import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler._ -import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils} +import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, Utils} + +/** + * A input stream that records the times of restore() invoked + */ +private[streaming] +class CheckpointInputDStream(ssc_ : StreamingContext) extends InputDStream[Int](ssc_) { + protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData + override def start(): Unit = { } +
spark git commit: [MINOR] Hide the error logs for 'SQLListenerMemoryLeakSuite'
Repository: spark Updated Branches: refs/heads/master f4346f612 -> 0370abdfd [MINOR] Hide the error logs for 'SQLListenerMemoryLeakSuite' Hide the error logs for 'SQLListenerMemoryLeakSuite' to avoid noises. Most of changes are space changes. Author: Shixiong ZhuCloses #10363 from zsxwing/hide-log. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0370abdf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0370abdf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0370abdf Branch: refs/heads/master Commit: 0370abdfd636566cd8df954c6f9ea5a794d275ef Parents: f4346f6 Author: Shixiong Zhu Authored: Thu Dec 17 18:18:12 2015 -0800 Committer: Shixiong Zhu Committed: Thu Dec 17 18:18:12 2015 -0800 -- .../sql/execution/ui/SQLListenerSuite.scala | 64 +++- 1 file changed, 35 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0370abdf/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 12a4e13..11a6ce9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -336,39 +336,45 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { class SQLListenerMemoryLeakSuite extends SparkFunSuite { test("no memory leak") { -val conf = new SparkConf() - .setMaster("local") - .setAppName("test") - .set("spark.task.maxFailures", "1") // Don't retry the tasks to run this test quickly - .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly -val sc = new SparkContext(conf) +val oldLogLevel = org.apache.log4j.Logger.getRootLogger().getLevel() try { - SQLContext.clearSqlListener() - val sqlContext = new SQLContext(sc) - import sqlContext.implicits._ - // Run 100 successful executions and 100 failed executions. - // Each execution only has one job and one stage. - for (i <- 0 until 100) { -val df = Seq( - (1, 1), - (2, 2) -).toDF() -df.collect() -try { - df.foreach(_ => throw new RuntimeException("Oops")) -} catch { - case e: SparkException => // This is expected for a failed job + org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.FATAL) + val conf = new SparkConf() +.setMaster("local") +.setAppName("test") +.set("spark.task.maxFailures", "1") // Don't retry the tasks to run this test quickly +.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly + val sc = new SparkContext(conf) + try { +SQLContext.clearSqlListener() +val sqlContext = new SQLContext(sc) +import sqlContext.implicits._ +// Run 100 successful executions and 100 failed executions. +// Each execution only has one job and one stage. +for (i <- 0 until 100) { + val df = Seq( +(1, 1), +(2, 2) + ).toDF() + df.collect() + try { +df.foreach(_ => throw new RuntimeException("Oops")) + } catch { +case e: SparkException => // This is expected for a failed job + } } +sc.listenerBus.waitUntilEmpty(1) +assert(sqlContext.listener.getCompletedExecutions.size <= 50) +assert(sqlContext.listener.getFailedExecutions.size <= 50) +// 50 for successful executions and 50 for failed executions +assert(sqlContext.listener.executionIdToData.size <= 100) +assert(sqlContext.listener.jobIdToExecutionId.size <= 100) +assert(sqlContext.listener.stageIdToStageMetrics.size <= 100) + } finally { +sc.stop() } - sc.listenerBus.waitUntilEmpty(1) - assert(sqlContext.listener.getCompletedExecutions.size <= 50) - assert(sqlContext.listener.getFailedExecutions.size <= 50) - // 50 for successful executions and 50 for failed executions - assert(sqlContext.listener.executionIdToData.size <= 100) - assert(sqlContext.listener.jobIdToExecutionId.size <= 100) - assert(sqlContext.listener.stageIdToStageMetrics.size <= 100) } finally { - sc.stop() + org.apache.log4j.Logger.getRootLogger().setLevel(oldLogLevel) } } }