spark git commit: [SPARK-25235][SHELL] Merge the REPL code in Scala 2.11 and 2.12 branches

2018-08-28 Thread dbtsai
Repository: spark
Updated Branches:
  refs/heads/master 38391c9aa -> ff8dcc1d4


[SPARK-25235][SHELL] Merge the REPL code in Scala 2.11 and 2.12 branches

## What changes were proposed in this pull request?

Using some reflection tricks to merge Scala 2.11 and 2.12 codebase.

## How was this patch tested?

Existing tests.

Closes #22246 from dbtsai/repl.

Lead-authored-by: DB Tsai 
Co-authored-by: Liang-Chi Hsieh 
Signed-off-by: DB Tsai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff8dcc1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff8dcc1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff8dcc1d

Branch: refs/heads/master
Commit: ff8dcc1d4c684e1b68e63d61b3f20284b9979cca
Parents: 38391c9
Author: DB Tsai 
Authored: Wed Aug 29 04:30:31 2018 +
Committer: DB Tsai 
Committed: Wed Aug 29 04:30:31 2018 +

--
 repl/pom.xml|  10 -
 .../org/apache/spark/repl/SparkILoop.scala  | 278 
 .../org/apache/spark/repl/SparkILoop.scala  | 143 -
 .../org/apache/spark/repl/SparkILoop.scala  | 319 +++
 4 files changed, 319 insertions(+), 431 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ff8dcc1d/repl/pom.xml
--
diff --git a/repl/pom.xml b/repl/pom.xml
index 861bbd7..553d5eb 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -167,14 +167,4 @@
 
   
 
-  
-
-  scala-2.12
-  
-scala-2.12/src/main/scala
-scala-2.12/src/test/scala
-  
-
-  
-
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ff8dcc1d/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
--
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
deleted file mode 100644
index 9426526..000
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.repl
-
-import java.io.BufferedReader
-
-// scalastyle:off println
-import scala.Predef.{println => _, _}
-// scalastyle:on println
-import scala.concurrent.Future
-import scala.reflect.classTag
-import scala.reflect.internal.util.ScalaClassLoader.savingContextLoader
-import scala.reflect.io.File
-import scala.tools.nsc.{GenericRunnerSettings, Properties}
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.{isReplDebug, isReplPower, replProps}
-import scala.tools.nsc.interpreter.{AbstractOrMissingHandler, ILoop, IMain, 
JPrintWriter}
-import scala.tools.nsc.interpreter.{NamedParam, SimpleReader, SplashLoop, 
SplashReader}
-import scala.tools.nsc.interpreter.StdReplTags.tagOfIMain
-import scala.tools.nsc.util.stringFromStream
-import scala.util.Properties.{javaVersion, javaVmName, versionString}
-
-/**
- *  A Spark-specific interactive shell.
- */
-class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
-extends ILoop(in0, out) {
-  def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
-  def this() = this(None, new JPrintWriter(Console.out, true))
-
-  override def createInterpreter(): Unit = {
-intp = new SparkILoopInterpreter(settings, out)
-  }
-
-  val initializationCommands: Seq[String] = Seq(
-"""
-@transient val spark = if (org.apache.spark.repl.Main.sparkSession != 
null) {
-org.apache.spark.repl.Main.sparkSession
-  } else {
-org.apache.spark.repl.Main.createSparkSession()
-  }
-@transient val sc = {
-  val _sc = spark.sparkContext
-  if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
-val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
-if (proxyUrl != null) {
-  println(
-s"Spark Context Web UI is available at 

svn commit: r29001 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_28_20_02-38391c9-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-28 Thread pwendell
Author: pwendell
Date: Wed Aug 29 03:16:01 2018
New Revision: 29001

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_28_20_02-38391c9 docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25253][PYSPARK] Refactor local connection & auth code

2018-08-28 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 68ec207a3 -> 38391c9aa


[SPARK-25253][PYSPARK] Refactor local connection & auth code

This eliminates some duplication in the code to connect to a server on 
localhost to talk directly to the jvm.  Also it gives consistent ipv6 and error 
handling.  Two other incidental changes, that shouldn't matter:
1) python barrier tasks perform authentication immediately (rather than waiting 
for the BARRIER_FUNCTION indicator)
2) for `rdd._load_from_socket`, the timeout is only increased after 
authentication.

Closes #22247 from squito/py_connection_refactor.

Authored-by: Imran Rashid 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38391c9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38391c9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38391c9a

Branch: refs/heads/master
Commit: 38391c9aa8a88fcebb337934f30298a32d91596b
Parents: 68ec207
Author: Imran Rashid 
Authored: Wed Aug 29 09:47:38 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Aug 29 09:47:38 2018 +0800

--
 .../apache/spark/api/python/PythonRunner.scala  |  3 +-
 python/pyspark/java_gateway.py  | 32 +++-
 python/pyspark/rdd.py   | 27 ++---
 python/pyspark/taskcontext.py   | 32 +++-
 python/pyspark/worker.py|  7 ++---
 5 files changed, 40 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38391c9a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index da6475c..6c7e863 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -216,6 +216,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   sock = serverSocket.get.accept()
   // Wait for function call from python side.
   sock.setSoTimeout(1)
+  authHelper.authClient(sock)
   val input = new DataInputStream(sock.getInputStream())
   input.readInt() match {
 case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION =>
@@ -334,8 +335,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 def barrierAndServe(sock: Socket): Unit = {
   require(serverSocket.isDefined, "No available ServerSocket to redirect 
the barrier() call.")
 
-  authHelper.authClient(sock)
-
   val out = new DataOutputStream(new 
BufferedOutputStream(sock.getOutputStream))
   try {
 context.asInstanceOf[BarrierTaskContext].barrier()

http://git-wip-us.apache.org/repos/asf/spark/blob/38391c9a/python/pyspark/java_gateway.py
--
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index fa2d5e8..b06503b 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -134,7 +134,7 @@ def launch_gateway(conf=None):
 return gateway
 
 
-def do_server_auth(conn, auth_secret):
+def _do_server_auth(conn, auth_secret):
 """
 Performs the authentication protocol defined by the SocketAuthHelper class 
on the given
 file-like object 'conn'.
@@ -147,6 +147,36 @@ def do_server_auth(conn, auth_secret):
 raise Exception("Unexpected reply from iterator server.")
 
 
+def local_connect_and_auth(port, auth_secret):
+"""
+Connect to local host, authenticate with it, and return a (sockfile,sock) 
for that connection.
+Handles IPV4 & IPV6, does some error handling.
+:param port
+:param auth_secret
+:return: a tuple with (sockfile, sock)
+"""
+sock = None
+errors = []
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, _, sa = res
+try:
+sock = socket.socket(af, socktype, proto)
+sock.settimeout(15)
+sock.connect(sa)
+sockfile = sock.makefile("rwb", 65536)
+_do_server_auth(sockfile, auth_secret)
+return (sockfile, sock)
+except socket.error as e:
+emsg = _exception_message(e)
+errors.append("tried to connect to %s, but an error occured: %s" % 
(sa, emsg))
+sock.close()
+sock = None
+else:
+raise Exception("could not 

spark git commit: [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType

2018-08-28 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 32c8a3d7b -> 68ec207a3


[SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType

## What changes were proposed in this pull request?

`toAvroType` converts spark data type to avro schema. It always appends the 
record name to namespace so its impossible to have an Avro namespace 
independent of the record name.

When invoked with a spark data type like,

```java
val sparkSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("address", StructType(Seq(
StructField("city", StringType, nullable = false),
StructField("state", StringType, nullable = false))),
nullable = false)))

// map it to an avro schema with record name "employee" and top level namespace 
"foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema,  false, "employee", 
"foo.bar")

// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is

```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22251 from arunmahadevan/avro-fix.

Authored-by: Arun Mahadevan 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ec207a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ec207a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ec207a

Branch: refs/heads/master
Commit: 68ec207a320bd50ca61e820c9ff559f799c2ab0a
Parents: 32c8a3d
Author: Arun Mahadevan 
Authored: Wed Aug 29 09:25:49 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Aug 29 09:25:49 2018 +0800

--
 .../spark/sql/avro/SchemaConverters.scala   | 18 -
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 42 +++-
 2 files changed, 48 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/68ec207a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
--
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 3a15e8d..bd15765 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -123,7 +123,7 @@ object SchemaConverters {
   catalystType: DataType,
   nullable: Boolean = false,
   recordName: String = "topLevelRecord",
-  prevNameSpace: String = "")
+  nameSpace: String = "")
 : Schema = {
 val builder = SchemaBuilder.builder()
 
@@ -143,29 +143,25 @@ object SchemaConverters {
 val avroType = LogicalTypes.decimal(d.precision, d.scale)
 val fixedSize = minBytesForPrecision(d.precision)
 // Need to avoid naming conflict for the fixed fields
-val name = prevNameSpace match {
+val name = nameSpace match {
   case "" => s"$recordName.fixed"
-  case _ => s"$prevNameSpace.$recordName.fixed"
+  case _ => s"$nameSpace.$recordName.fixed"
 }
 avroType.addToSchema(SchemaBuilder.fixed(name).size(fixedSize))
 
   case BinaryType => builder.bytesType()
   case ArrayType(et, containsNull) =>
 builder.array()
-  .items(toAvroType(et, containsNull, recordName, prevNameSpace))
+  .items(toAvroType(et, containsNull, recordName, nameSpace))
   case MapType(StringType, vt, valueContainsNull) =>
 builder.map()
-  .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace))
+  .values(toAvroType(vt, valueContainsNull, recordName, nameSpace))
   case st: StructType =>
-val nameSpace = prevNameSpace match {
-  case "" => recordName
-  case _ => s"$prevNameSpace.$recordName"
-}
-
+val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" 
else recordName
 val fieldsAssembler = 
builder.record(recordName).namespace(nameSpace).fields()
 st.foreach { f =>
   val fieldAvroType =
-toAvroType(f.dataType, f.nullable, f.name, nameSpace)
+toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
   fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
 }
 fieldsAssembler.endRecord()


spark git commit: [MINOR] Avoid code duplication for nullable in Higher Order function

2018-08-28 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master bbbf81469 -> 32c8a3d7b


[MINOR] Avoid code duplication for nullable in Higher Order function

## What changes were proposed in this pull request?

Most of  `HigherOrderFunction`s have the same `nullable` definition, ie. they 
are nullable when one of their arguments is nullable. The PR refactors it in 
order to avoid code duplication.

## How was this patch tested?

NA

Closes #22243 from mgaido91/MINOR_nullable_hof.

Authored-by: Marco Gaido 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32c8a3d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32c8a3d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32c8a3d7

Branch: refs/heads/master
Commit: 32c8a3d7beac4b47a75f5ec3c69b13ebc57de0c7
Parents: bbbf814
Author: Marco Gaido 
Authored: Wed Aug 29 09:20:32 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Aug 29 09:20:32 2018 +0800

--
 .../expressions/higherOrderFunctions.scala| 18 ++
 1 file changed, 2 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/32c8a3d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
index 9f2e84a..2bb6b20 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
@@ -90,6 +90,8 @@ object LambdaFunction {
  */
 trait HigherOrderFunction extends Expression with ExpectsInputTypes {
 
+  override def nullable: Boolean = arguments.exists(_.nullable)
+
   override def children: Seq[Expression] = arguments ++ functions
 
   /**
@@ -217,8 +219,6 @@ case class ArrayTransform(
 function: Expression)
   extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback {
 
-  override def nullable: Boolean = argument.nullable
-
   override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
 
   override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArrayTransform = {
@@ -287,8 +287,6 @@ case class MapFilter(
 copy(function = f(function, (keyType, false) :: (valueType, 
valueContainsNull) :: Nil))
   }
 
-  override def nullable: Boolean = argument.nullable
-
   override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = {
 val m = argumentValue.asInstanceOf[MapData]
 val f = functionForEval
@@ -328,8 +326,6 @@ case class ArrayFilter(
 function: Expression)
   extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback {
 
-  override def nullable: Boolean = argument.nullable
-
   override def dataType: DataType = argument.dataType
 
   override def functionType: AbstractDataType = BooleanType
@@ -375,8 +371,6 @@ case class ArrayExists(
 function: Expression)
   extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback {
 
-  override def nullable: Boolean = argument.nullable
-
   override def dataType: DataType = BooleanType
 
   override def functionType: AbstractDataType = BooleanType
@@ -516,8 +510,6 @@ case class TransformKeys(
 function: Expression)
   extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
 
-  override def nullable: Boolean = argument.nullable
-
   @transient lazy val MapType(keyType, valueType, valueContainsNull) = 
argument.dataType
 
   override def dataType: DataType = MapType(function.dataType, valueType, 
valueContainsNull)
@@ -568,8 +560,6 @@ case class TransformValues(
 function: Expression)
   extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
 
-  override def nullable: Boolean = argument.nullable
-
   @transient lazy val MapType(keyType, valueType, valueContainsNull) = 
argument.dataType
 
   override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
@@ -638,8 +628,6 @@ case class MapZipWith(left: Expression, right: Expression, 
function: Expression)
 
   override def functionTypes: Seq[AbstractDataType] = AnyDataType :: Nil
 
-  override def nullable: Boolean = left.nullable || right.nullable
-
   override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
 
   override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
@@ -810,8 +798,6 @@ case class ZipWith(left: Expression, right: Expression, 
function: Expression)
 
   override def functionTypes: Seq[AbstractDataType] = AnyDataType :: Nil
 

spark git commit: [SPARK-22357][CORE] SparkContext.binaryFiles ignore minPartitions parameter

2018-08-28 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 103854028 -> bbbf81469


[SPARK-22357][CORE] SparkContext.binaryFiles ignore minPartitions parameter

## What changes were proposed in this pull request?
Fix the issue that minPartitions was not used in the method. This is a simple 
fix and I am not trying to make it complicated. The purpose is to still allow 
user to control the defaultParallelism through the value of minPartitions, 
while also via sc.defaultParallelism parameters.

## How was this patch tested?
I have not provided the additional test since the fix is very straightforward.

Closes #21638 from bomeng/22357.

Lead-authored-by: Bo Meng 
Co-authored-by: Bo Meng 
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbbf8146
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbbf8146
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbbf8146

Branch: refs/heads/master
Commit: bbbf8146916aa70d9774543643776eed9d9d9373
Parents: 1038540
Author: Bo Meng 
Authored: Tue Aug 28 19:39:13 2018 -0500
Committer: Sean Owen 
Committed: Tue Aug 28 19:39:13 2018 -0500

--
 .../src/main/scala/org/apache/spark/input/PortableDataStream.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bbbf8146/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 17cdba4..ab020aa 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -47,7 +47,7 @@ private[spark] abstract class StreamFileInputFormat[T]
   def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: 
Int) {
 val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
 val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
-val defaultParallelism = sc.defaultParallelism
+val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions)
 val files = listStatus(context).asScala
 val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + 
openCostInBytes).sum
 val bytesPerCore = totalBytes / defaultParallelism


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r29000 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_28_16_01-1038540-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-28 Thread pwendell
Author: pwendell
Date: Tue Aug 28 23:15:56 2018
New Revision: 29000

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_28_16_01-1038540 docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25212][SQL] Support Filter in ConvertToLocalRelation

2018-08-28 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 7ad18ee9f -> 103854028


[SPARK-25212][SQL] Support Filter in ConvertToLocalRelation

## What changes were proposed in this pull request?
Support Filter in ConvertToLocalRelation, similar to how Project works.
Additionally, in Optimizer, run ConvertToLocalRelation earlier to simplify the 
plan. This is good for very short queries which often are queries on local 
relations.

## How was this patch tested?
New test. Manual benchmark.

Author: Bogdan Raducanu 
Author: Shixiong Zhu 
Author: Yinan Li 
Author: Li Jin 
Author: s71955 
Author: DB Tsai 
Author: jaroslav chládek 
Author: Huangweizhe 
Author: Xiangrui Meng 
Author: hyukjinkwon 
Author: Kent Yao 
Author: caoxuewen 
Author: liuxian 
Author: Adam Bradbury 
Author: Jose Torres 
Author: Yuming Wang 
Author: Liang-Chi Hsieh 

Closes #22205 from bogdanrdc/local-relation-filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10385402
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10385402
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10385402

Branch: refs/heads/master
Commit: 103854028e99846aabeb6f27eb6fd255ecc96381
Parents: 7ad18ee
Author: Bogdan Raducanu 
Authored: Tue Aug 28 15:50:25 2018 -0700
Committer: Xiao Li 
Committed: Tue Aug 28 15:50:25 2018 -0700

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala  | 14 ++
 .../optimizer/ConvertToLocalRelationSuite.scala   | 18 ++
 .../org/apache/spark/sql/DataFrameJoinSuite.scala |  8 
 3 files changed, 36 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/10385402/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 63a62cd..e4b4f1e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -130,6 +130,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 //   since the other rules might make two separate Unions operators 
adjacent.
 Batch("Union", Once,
   CombineUnions) ::
+// run this once earlier. this might simplify the plan and reduce cost of 
optimizer.
+// for example, a query such as Filter(LocalRelation) would go through all 
the heavy
+// optimizer rules that are triggered when there is a filter
+// (e.g. InferFiltersFromConstraints). if we run this batch earlier, the 
query becomes just
+// LocalRelation and does not trigger many rules
+Batch("LocalRelation early", fixedPoint,
+  ConvertToLocalRelation,
+  PropagateEmptyRelation) ::
 Batch("Pullup Correlated Expressions", Once,
   PullupCorrelatedPredicates) ::
 Batch("Subquery", Once,
@@ -1349,6 +1357,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
 
 case Limit(IntegerLiteral(limit), LocalRelation(output, data, 
isStreaming)) =>
   LocalRelation(output, data.take(limit), isStreaming)
+
+case Filter(condition, LocalRelation(output, data, isStreaming))
+if !hasUnevaluableExpr(condition) =>
+  val predicate = InterpretedPredicate.create(condition, output)
+  predicate.initialize(0)
+  LocalRelation(output, data.filter(row => predicate.eval(row)), 
isStreaming)
   }
 
   private def hasUnevaluableExpr(expr: Expression): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/10385402/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
index 049a19b..0c015f8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{LessThan, Literal}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import 

svn commit: r28998 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_28_14_01-306e881-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-28 Thread pwendell
Author: pwendell
Date: Tue Aug 28 21:15:27 2018
New Revision: 28998

Log:
Apache Spark 2.3.3-SNAPSHOT-2018_08_28_14_01-306e881 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25004][CORE] Add spark.executor.pyspark.memory limit.

2018-08-28 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master aff8f15c1 -> 7ad18ee9f


[SPARK-25004][CORE] Add spark.executor.pyspark.memory limit.

## What changes were proposed in this pull request?

This adds `spark.executor.pyspark.memory` to configure Python's address space 
limit, 
[`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS).
 Limiting Python's address space allows Python to participate in memory 
management. In practice, we see fewer cases of Python taking too much memory 
because it doesn't know to run garbage collection. This results in YARN killing 
fewer containers. This also improves error messages so users know that Python 
is consuming too much memory:

```
  File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in 
fe_engineer
fe_eval_rec.update(f(src_rec_prep, mat_rec_prep))
  File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp
comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, 
[]), mat_rec_prep.get(item, []))
  File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in 
leven_list_compare
permutations = sorted(permutations, reverse=True)
  MemoryError
```

The new pyspark memory setting is used to increase requested YARN container 
memory, instead of sharing overhead memory between python and off-heap JVM 
activity.

## How was this patch tested?

Tested memory limits in our YARN cluster and verified that MemoryError is 
thrown.

Author: Ryan Blue 

Closes #21977 from rdblue/SPARK-25004-add-python-memory-limit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ad18ee9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ad18ee9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ad18ee9

Branch: refs/heads/master
Commit: 7ad18ee9f26e75dbe038c6034700f9cd4c0e2baa
Parents: aff8f15
Author: Ryan Blue 
Authored: Tue Aug 28 12:31:33 2018 -0700
Committer: Marcelo Vanzin 
Committed: Tue Aug 28 12:31:33 2018 -0700

--
 .../org/apache/spark/api/python/PythonRDD.scala |  5 +---
 .../apache/spark/api/python/PythonRunner.scala  | 27 
 .../apache/spark/internal/config/package.scala  |  4 +++
 docs/configuration.md   | 12 +
 python/pyspark/worker.py| 23 +
 .../org/apache/spark/deploy/yarn/Client.scala   | 17 
 .../spark/deploy/yarn/YarnAllocator.scala   |  9 ++-
 .../deploy/yarn/BaseYarnClusterSuite.scala  | 27 ++--
 .../spark/deploy/yarn/YarnClusterSuite.scala|  6 +++--
 .../python/AggregateInPandasExec.scala  |  4 ---
 .../execution/python/ArrowEvalPythonExec.scala  |  4 ---
 .../execution/python/ArrowPythonRunner.scala|  4 +--
 .../execution/python/BatchEvalPythonExec.scala  |  5 +---
 .../sql/execution/python/EvalPythonExec.scala   |  6 +
 .../python/FlatMapGroupsInPandasExec.scala  |  4 ---
 .../execution/python/PythonForeachWriter.scala  |  5 +---
 .../sql/execution/python/PythonUDFRunner.scala  |  4 +--
 .../execution/python/WindowInPandasExec.scala   |  4 ---
 18 files changed, 105 insertions(+), 65 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7ad18ee9/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index c3db60a..197f464 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -49,9 +49,6 @@ private[spark] class PythonRDD(
 isFromBarrier: Boolean = false)
   extends RDD[Array[Byte]](parent) {
 
-  val bufferSize = conf.getInt("spark.buffer.size", 65536)
-  val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
-
   override def getPartitions: Array[Partition] = firstParent.partitions
 
   override val partitioner: Option[Partitioner] = {
@@ -61,7 +58,7 @@ private[spark] class PythonRDD(
   val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[Array[Byte]] = {
-val runner = PythonRunner(func, bufferSize, reuseWorker)
+val runner = PythonRunner(func)
 runner.compute(firstParent.iterator(split, context), split.index, context)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7ad18ee9/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 

svn commit: r28997 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_28_12_01-aff8f15-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-28 Thread pwendell
Author: pwendell
Date: Tue Aug 28 19:16:55 2018
New Revision: 28997

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_28_12_01-aff8f15 docs


[This commit notification would consist of 1478 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS

2018-08-28 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 4e3f3cebe -> aff8f15c1


[SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS

## What changes were proposed in this pull request?

In the PR, I propose to not perform recursive parallel listening of files in 
the `scanPartitions` method because it can cause a deadlock. Instead of that I 
propose to do `scanPartitions` in parallel for top level partitions only.

## How was this patch tested?

I extended an existing test to trigger the deadlock.

Author: Maxim Gekk 

Closes #22233 from MaxGekk/fix-recover-partitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aff8f15c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aff8f15c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aff8f15c

Branch: refs/heads/master
Commit: aff8f15c153f8031ceaffa237c60e040c6f8115f
Parents: 4e3f3ce
Author: Maxim Gekk 
Authored: Tue Aug 28 11:29:05 2018 -0700
Committer: Xiao Li 
Committed: Tue Aug 28 11:29:05 2018 -0700

--
 .../spark/sql/execution/command/ddl.scala   | 34 +--
 .../spark/sql/execution/command/DDLSuite.scala  | 59 
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 15 ++---
 3 files changed, 61 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aff8f15c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7a6f574..e1faece 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import java.util.Locale
 
 import scala.collection.{GenMap, GenSeq}
-import scala.concurrent.ExecutionContext
+import scala.collection.parallel.ForkJoinTaskSupport
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
@@ -40,7 +40,6 @@ import 
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
-import org.apache.spark.util.ThreadUtils.parmap
 
 // Note: The definition of these commands are based on the ones described in
 // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand(
 val evalPool = 
ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
 val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
   try {
-implicit val ec = ExecutionContext.fromExecutor(evalPool)
 scanPartitions(spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames, threshold,
-  spark.sessionState.conf.resolver)
+  spark.sessionState.conf.resolver, new 
ForkJoinTaskSupport(evalPool)).seq
   } finally {
 evalPool.shutdown()
   }
@@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand(
   spec: TablePartitionSpec,
   partitionNames: Seq[String],
   threshold: Int,
-  resolver: Resolver)(implicit ec: ExecutionContext): 
Seq[(TablePartitionSpec, Path)] = {
+  resolver: Resolver,
+  evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, 
Path)] = {
 if (partitionNames.isEmpty) {
   return Seq(spec -> path)
 }
 
-val statuses = fs.listStatus(path, filter).toSeq
-def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = {
+val statuses = fs.listStatus(path, filter)
+val statusPar: GenSeq[FileStatus] =
+  if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+// parallelize the list of partitions here, then we can have better 
parallelism later.
+val parArray = statuses.par
+parArray.tasksupport = evalTaskSupport
+parArray
+  } else {
+statuses
+  }
+statusPar.flatMap { st =>
   val name = st.getPath.getName

spark git commit: [SPARK-24704][WEBUI] Fix the order of stages in the DAG graph

2018-08-28 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8db935f97 -> 306e881b6


[SPARK-24704][WEBUI] Fix the order of stages in the DAG graph

## What changes were proposed in this pull request?

Before:

![wx20180630-155537](https://user-images.githubusercontent.com/1438757/42123357-2c2e2d84-7c83-11e8-8abd-1c2860f38783.png)

After:

![wx20180630-155604](https://user-images.githubusercontent.com/1438757/42123359-32fae990-7c83-11e8-8a7b-cdcee94f9123.png)

## How was this patch tested?

Manual tests.

Author: Stan Zhai 

Closes #21680 from stanzhai/fix-dag-graph.

(cherry picked from commit 772060d0940a97d89807befd682a70ae82e83ef4)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/306e881b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/306e881b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/306e881b

Branch: refs/heads/branch-2.3
Commit: 306e881b62eb112f8014219098eb97f7cbe75e98
Parents: 8db935f
Author: Stan Zhai 
Authored: Wed Jul 4 10:12:36 2018 +0200
Committer: Marcelo Vanzin 
Committed: Tue Aug 28 10:38:03 2018 -0700

--
 core/src/main/scala/org/apache/spark/status/AppStatusStore.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/306e881b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 688f25a..e237281 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -471,7 +471,7 @@ private[spark] class AppStatusStore(
 
   def operationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
 val job = store.read(classOf[JobDataWrapper], jobId)
-val stages = job.info.stageIds
+val stages = job.info.stageIds.sorted
 
 stages.map { id =>
   val g = store.read(classOf[RDDOperationGraphWrapper], 
id).toRDDOperationGraph()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23679][YARN] Setting RM_HA_URLS for AmIpFilter to avoid redirect failure in YARN mode

2018-08-28 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master de46df549 -> 4e3f3cebe


[SPARK-23679][YARN] Setting RM_HA_URLS for AmIpFilter to avoid redirect failure 
in YARN mode

## What changes were proposed in this pull request?

YARN `AmIpFilter` adds a new parameter "RM_HA_URLS" to support RM HA, but Spark 
on YARN doesn't provide a such parameter, so it will be failed to redirect when 
running on RM HA. The detailed exception can be checked from JIRA. So here 
fixing this issue by adding "RM_HA_URLS" parameter.

## How was this patch tested?

Local verification.

Closes #22164 from jerryshao/SPARK-23679.

Authored-by: jerryshao 
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e3f3ceb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e3f3ceb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e3f3ceb

Branch: refs/heads/master
Commit: 4e3f3cebe4cc6f47c264821a5ea92c32a4f1daa5
Parents: de46df5
Author: jerryshao 
Authored: Tue Aug 28 10:33:39 2018 -0700
Committer: Marcelo Vanzin 
Committed: Tue Aug 28 10:33:39 2018 -0700

--
 .../apache/spark/deploy/yarn/YarnRMClient.scala | 29 +++-
 1 file changed, 28 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4e3f3ceb/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index b59dcf1..05a7b1e 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
@@ -112,7 +113,16 @@ private[spark] class YarnRMClient extends Logging {
 val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)
 val hosts = proxies.asScala.map(_.split(":").head)
 val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
-Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> 
uriBases.mkString(","))
+val params =
+  Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> 
uriBases.mkString(","))
+
+// Handles RM HA urls
+val rmIds = conf.getStringCollection(YarnConfiguration.RM_HA_IDS).asScala
+if (rmIds != null && rmIds.nonEmpty) {
+  params + ("RM_HA_URLS" -> rmIds.map(getUrlByRmId(conf, _)).mkString(","))
+} else {
+  params
+}
   }
 
   /** Returns the maximum number of attempts to register the AM. */
@@ -126,4 +136,21 @@ private[spark] class YarnRMClient extends Logging {
 }
   }
 
+  private def getUrlByRmId(conf: Configuration, rmId: String): String = {
+val addressPropertyPrefix = if (YarnConfiguration.useHttps(conf)) {
+  YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
+} else {
+  YarnConfiguration.RM_WEBAPP_ADDRESS
+}
+
+val addressWithRmId = if (rmId == null || rmId.isEmpty) {
+  addressPropertyPrefix
+} else if (rmId.startsWith(".")) {
+  throw new IllegalStateException(s"rmId $rmId should not already have '.' 
prepended.")
+} else {
+  s"$addressPropertyPrefix.$rmId"
+}
+
+conf.get(addressWithRmId)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23997][SQL] Configurable maximum number of buckets

2018-08-28 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 1149c4efb -> de46df549


[SPARK-23997][SQL] Configurable maximum number of buckets

## What changes were proposed in this pull request?
This PR implements the possibility of the user to override the maximum number 
of buckets when saving to a table.
Currently the limit is a hard-coded 100k, which might be insufficient for large 
workloads.
A new configuration entry is proposed: `spark.sql.bucketing.maxBuckets`, which 
defaults to the previous 100k.

## How was this patch tested?
Added unit tests in the following spark.sql test suites:

- CreateTableAsSelectSuite
- BucketedWriteSuite

Author: Fernando Pereira 

Closes #21087 from ferdonline/enh/configurable_bucket_limit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de46df54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de46df54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de46df54

Branch: refs/heads/master
Commit: de46df549acee7fda56bb0871f444d2f3b49e582
Parents: 1149c4e
Author: Fernando Pereira 
Authored: Tue Aug 28 10:31:47 2018 -0700
Committer: Xiao Li 
Committed: Tue Aug 28 10:31:47 2018 -0700

--
 .../spark/sql/catalyst/catalog/interface.scala  |  8 +++--
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 +
 .../spark/sql/sources/BucketedWriteSuite.scala  | 33 +++---
 .../sql/sources/CreateTableAsSelectSuite.scala  | 35 ++--
 4 files changed, 76 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/de46df54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a4ead53..3842d79 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 
@@ -173,9 +174,12 @@ case class BucketSpec(
 numBuckets: Int,
 bucketColumnNames: Seq[String],
 sortColumnNames: Seq[String]) {
-  if (numBuckets <= 0 || numBuckets >= 10) {
+  def conf: SQLConf = SQLConf.get
+
+  if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) {
 throw new AnalysisException(
-  s"Number of buckets should be greater than 0 but less than 10. Got 
`$numBuckets`")
+  s"Number of buckets should be greater than 0 but less than 
bucketing.maxBuckets " +
+s"(`${conf.bucketingMaxBuckets}`). Got `$numBuckets`")
   }
 
   override def toString: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/de46df54/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6336e89..738d8fe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -674,6 +674,12 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val BUCKETING_MAX_BUCKETS = 
buildConf("spark.sql.sources.bucketing.maxBuckets")
+.doc("The maximum number of buckets allowed. Defaults to 10")
+.intConf
+.checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets 
must be larger than 0")
+.createWithDefault(10)
+
   val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
 .doc("When false, we will throw an error if a query contains a cartesian 
product without " +
 "explicit CROSS JOIN syntax.")
@@ -1803,6 +1809,8 @@ class SQLConf extends Serializable with Logging {
 
   def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
 
+  def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS)
+
   def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
 getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de46df54/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
--
diff --git 

spark git commit: [SPARK-25005][SS] Support non-consecutive offsets for Kafka

2018-08-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 592e3a42c -> 1149c4efb


[SPARK-25005][SS] Support non-consecutive offsets for Kafka

## What changes were proposed in this pull request?

As the user uses Kafka transactions to write data, the offsets in Kafka will be 
non-consecutive. It will contains some transaction (commit or abort) markers. 
In addition, if the consumer's `isolation.level` is `read_committed`, `poll` 
will not return aborted messages either. Hence, we will see non-consecutive 
offsets in the date returned by `poll`. However, as `seekToEnd` may move the 
offset point to these missing offsets, there are 4 possible corner cases we 
need to support:

- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch

They are all covered by the new unit tests.

## How was this patch tested?

The new unit tests.

Closes #22042 from zsxwing/kafka-transaction-read.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1149c4ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1149c4ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1149c4ef

Branch: refs/heads/master
Commit: 1149c4efbc5ebe5b412d8f9c61558fef59179a9e
Parents: 592e3a4
Author: Shixiong Zhu 
Authored: Tue Aug 28 08:38:07 2018 -0700
Committer: Shixiong Zhu 
Committed: Tue Aug 28 08:38:07 2018 -0700

--
 .../kafka010/KafkaContinuousReadSupport.scala   |   2 +-
 .../spark/sql/kafka010/KafkaDataConsumer.scala  | 273 ++-
 .../kafka010/KafkaContinuousSourceSuite.scala   | 149 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 255 -
 .../spark/sql/kafka010/KafkaRelationSuite.scala |  93 +++
 .../spark/sql/kafka010/KafkaTestUtils.scala |  22 +-
 6 files changed, 720 insertions(+), 74 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
index 4a18839..1753a28 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
@@ -227,7 +227,7 @@ class KafkaContinuousPartitionReader(
 
 // This is a failOnDataLoss exception. Retry if nextKafkaOffset is 
within the data range,
 // or if it's the endpoint of the data range (i.e. the "true" next 
offset).
-case e: IllegalStateException  if 
e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
+case e: IllegalStateException if 
e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
   val range = consumer.getAvailableOffsetRange()
   if (range.latest >= nextKafkaOffset && range.earliest <= 
nextKafkaOffset) {
 // retry

http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index 65046c1..ceb9e31 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -33,9 +33,19 @@ import org.apache.spark.util.UninterruptibleThread
 
 private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset within 
[offset, untilOffset),
-   * or null.
+   * Get the record for the given offset if available.
+   *
+   * If the record is invisible (either a
+   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
+   * `read_committed`), it will be skipped and this method will try to fetch 
next available record
+   * within [offset, untilOffset).
+   *
+   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
+   * throw an exception when we detect an unavailable offset. If 

spark git commit: [SPARK-25218][CORE] Fix potential resource leaks in TransportServer and SocketAuthHelper

2018-08-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 8198ea501 -> 592e3a42c


[SPARK-25218][CORE] Fix potential resource leaks in TransportServer and 
SocketAuthHelper

## What changes were proposed in this pull request?

Make sure TransportServer and SocketAuthHelper close the resources for all 
types of errors.

## How was this patch tested?

Jenkins

Closes #22210 from zsxwing/SPARK-25218.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/592e3a42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/592e3a42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/592e3a42

Branch: refs/heads/master
Commit: 592e3a42c20b72edd6e8b9dd07da367596f43da5
Parents: 8198ea5
Author: Shixiong Zhu 
Authored: Tue Aug 28 08:36:06 2018 -0700
Committer: Shixiong Zhu 
Committed: Tue Aug 28 08:36:06 2018 -0700

--
 .../buffer/FileSegmentManagedBuffer.java| 32 ++---
 .../spark/network/server/TransportServer.java   |  9 ++--
 .../spark/security/SocketAuthHelper.scala   | 50 +---
 3 files changed, 54 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/592e3a42/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 8b8f989..45fee54 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -77,16 +77,16 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
 return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
   }
 } catch (IOException e) {
+  String errorMessage = "Error in reading " + this;
   try {
 if (channel != null) {
   long size = channel.size();
-  throw new IOException("Error in reading " + this + " (actual file 
length " + size + ")",
-e);
+  errorMessage = "Error in reading " + this + " (actual file length " 
+ size + ")";
 }
   } catch (IOException ignored) {
 // ignore
   }
-  throw new IOException("Error in opening " + this, e);
+  throw new IOException(errorMessage, e);
 } finally {
   JavaUtils.closeQuietly(channel);
 }
@@ -95,26 +95,24 @@ public final class FileSegmentManagedBuffer extends 
ManagedBuffer {
   @Override
   public InputStream createInputStream() throws IOException {
 FileInputStream is = null;
+boolean shouldClose = true;
 try {
   is = new FileInputStream(file);
   ByteStreams.skipFully(is, offset);
-  return new LimitedInputStream(is, length);
+  InputStream r = new LimitedInputStream(is, length);
+  shouldClose = false;
+  return r;
 } catch (IOException e) {
-  try {
-if (is != null) {
-  long size = file.length();
-  throw new IOException("Error in reading " + this + " (actual file 
length " + size + ")",
-  e);
-}
-  } catch (IOException ignored) {
-// ignore
-  } finally {
+  String errorMessage = "Error in reading " + this;
+  if (is != null) {
+long size = file.length();
+errorMessage = "Error in reading " + this + " (actual file length " + 
size + ")";
+  }
+  throw new IOException(errorMessage, e);
+} finally {
+  if (shouldClose) {
 JavaUtils.closeQuietly(is);
   }
-  throw new IOException("Error in opening " + this, e);
-} catch (RuntimeException e) {
-  JavaUtils.closeQuietly(is);
-  throw e;
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/592e3a42/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index d95ed22..9c85ab2 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -70,11 +70,14 @@ public class TransportServer implements Closeable {
 this.appRpcHandler = appRpcHandler;
 this.bootstraps = 
Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
 
+boolean