spark git commit: [SPARK-21583][SQL] Create a ColumnarBatch from ArrowColumnVectors

2017-08-30 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master ecf437a64 -> 964b507c7


[SPARK-21583][SQL] Create a ColumnarBatch from ArrowColumnVectors

## What changes were proposed in this pull request?

This PR allows the creation of a `ColumnarBatch` from `ReadOnlyColumnVectors` 
where previously a columnar batch could only allocate vectors internally.  This 
is useful for using `ArrowColumnVectors` in a batch form to do row-based 
iteration.  Also added `ArrowConverter.fromPayloadIterator` which converts 
`ArrowPayload` iterator to `InternalRow` iterator and uses a `ColumnarBatch` 
internally.

## How was this patch tested?

Added a new unit test for creating a `ColumnarBatch` with 
`ReadOnlyColumnVectors` and a test to verify the roundtrip of rows -> 
ArrowPayload -> rows, using `toPayloadIterator` and `fromPayloadIterator`.

Author: Bryan Cutler 

Closes #18787 from BryanCutler/arrow-ColumnarBatch-support-SPARK-21583.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/964b507c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/964b507c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/964b507c

Branch: refs/heads/master
Commit: 964b507c7511cf3f4383cb0fc4026a573034b8cc
Parents: ecf437a
Author: Bryan Cutler 
Authored: Thu Aug 31 13:08:52 2017 +0900
Committer: Takuya UESHIN 
Committed: Thu Aug 31 13:08:52 2017 +0900

--
 .../sql/execution/arrow/ArrowConverters.scala   | 76 +++-
 .../execution/arrow/ArrowConvertersSuite.scala  | 29 +++-
 .../vectorized/ColumnarBatchSuite.scala | 54 ++
 3 files changed, 157 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/964b507c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index fa45822..561a067 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.arrow
 import java.io.ByteArrayOutputStream
 import java.nio.channels.Channels
 
+import scala.collection.JavaConverters._
+
 import org.apache.arrow.memory.BufferAllocator
 import org.apache.arrow.vector._
 import org.apache.arrow.vector.file._
@@ -28,6 +30,7 @@ import 
org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel
 
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, 
ColumnarBatch, ColumnVector}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -35,7 +38,7 @@ import org.apache.spark.util.Utils
 /**
  * Store Arrow data in a form that can be serialized by Spark and served to a 
Python process.
  */
-private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends 
Serializable {
+private[sql] class ArrowPayload private[sql] (payload: Array[Byte]) extends 
Serializable {
 
   /**
* Convert the ArrowPayload to an ArrowRecordBatch.
@@ -50,6 +53,17 @@ private[sql] class ArrowPayload private[arrow] (payload: 
Array[Byte]) extends Se
   def asPythonSerializable: Array[Byte] = payload
 }
 
+/**
+ * Iterator interface to iterate over Arrow record batches and return rows
+ */
+private[sql] trait ArrowRowIterator extends Iterator[InternalRow] {
+
+  /**
+   * Return the schema loaded from the Arrow record batch being iterated over
+   */
+  def schema: StructType
+}
+
 private[sql] object ArrowConverters {
 
   /**
@@ -111,6 +125,66 @@ private[sql] object ArrowConverters {
   }
 
   /**
+   * Maps Iterator from ArrowPayload to InternalRow. Returns a pair containing 
the row iterator
+   * and the schema from the first batch of Arrow data read.
+   */
+  private[sql] def fromPayloadIterator(
+  payloadIter: Iterator[ArrowPayload],
+  context: TaskContext): ArrowRowIterator = {
+val allocator =
+  ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, 
Long.MaxValue)
+
+new ArrowRowIterator {
+  private var reader: ArrowFileReader = null
+  private var schemaRead = StructType(Seq.empty)
+  private var rowIter = if (payloadIter.hasNext) nextBatch() else 
Iterator.empty
+
+  context.addTaskCompletionListener { _ =>
+closeReader()
+allocator.close()
+  }
+
+  override def schema: StructType = schemaRead
+
+  override def hasNext: Boolean = rowIter.hasNext || {
+

spark git commit: [SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python row with empty bytearray

2017-08-30 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 4482ff23a -> ecf437a64


[SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python 
row with empty bytearray

## What changes were proposed in this pull request?

`PickleException` is thrown when creating dataframe from python row with empty 
bytearray

spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x: 
{"abc": x.xx})).show()

net.razorvine.pickle.PickleException: invalid pickle data for bytearray; 
expected 1 or 2 args, got 0
at 
net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java
...

`ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh 

Closes #19085 from viirya/SPARK-21534.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecf437a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecf437a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecf437a6

Branch: refs/heads/master
Commit: ecf437a64874a31328f4e28c6b24f37557fbe07d
Parents: 4482ff2
Author: Liang-Chi Hsieh 
Authored: Thu Aug 31 12:55:38 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Aug 31 12:55:38 2017 +0900

--
 .../scala/org/apache/spark/api/python/SerDeUtil.scala | 14 ++
 python/pyspark/sql/tests.py   |  4 +++-
 2 files changed, 17 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ecf437a6/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index aaf8e7a..01e64b6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -35,6 +35,16 @@ import org.apache.spark.rdd.RDD
 
 /** Utilities for serialization / deserialization between Python and Java, 
using Pickle. */
 private[spark] object SerDeUtil extends Logging {
+  class ByteArrayConstructor extends 
net.razorvine.pickle.objects.ByteArrayConstructor {
+override def construct(args: Array[Object]): Object = {
+  // Deal with an empty byte array pickled by Python 3.
+  if (args.length == 0) {
+Array.emptyByteArray
+  } else {
+super.construct(args)
+  }
+}
+  }
   // Unpickle array.array generated by Python 2.6
   class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor 
{
 //  /* Description of types */
@@ -108,6 +118,10 @@ private[spark] object SerDeUtil extends Logging {
 synchronized{
   if (!initialized) {
 Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+Unpickler.registerConstructor("__builtin__", "bytearray", new 
ByteArrayConstructor())
+Unpickler.registerConstructor("builtins", "bytearray", new 
ByteArrayConstructor())
+Unpickler.registerConstructor("__builtin__", "bytes", new 
ByteArrayConstructor())
+Unpickler.registerConstructor("_codecs", "encode", new 
ByteArrayConstructor())
 initialized = true
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ecf437a6/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1ecde68..b310285 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -2383,9 +2383,11 @@ class SQLTests(ReusedPySparkTestCase):
 
 def test_BinaryType_serialization(self):
 # Pyrolite version <= 4.9 could not serialize BinaryType with Python3 
SPARK-17808
+# The empty bytearray is test for SPARK-21534.
 schema = StructType([StructField('mybytes', BinaryType())])
 data = [[bytearray(b'here is my data')],
-[bytearray(b'and here is some more')]]
+[bytearray(b'and here is some more')],
+[bytearray(b'')]]
 df = self.spark.createDataFrame(data, schema=schema)
 df.collect()
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17321][YARN] Avoid writing shuffle metadata to disk if NM recovery is disabled

2017-08-30 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master cd5d0f337 -> 4482ff23a


[SPARK-17321][YARN] Avoid writing shuffle metadata to disk if NM recovery is 
disabled

In the current code, if NM recovery is not enabled then `YarnShuffleService` 
will write shuffle metadata to NM local dir-1, if this local dir-1 is on bad 
disk, then `YarnShuffleService` will be failed to start. So to solve this 
issue, in Spark side if NM recovery is not enabled, then Spark will not persist 
data into leveldb, in that case yarn shuffle service can still be served but 
lose the ability for recovery, (it is fine because the failure of NM will kill 
the containers as well as applications).

Tested in the local cluster with NM recovery off and on to see if folder is 
created or not. MiniCluster UT isn't added because in MiniCluster NM will 
always set port to 0, but NM recovery requires non-ephemeral port.

Author: jerryshao 

Closes #19032 from jerryshao/SPARK-17321.

Change-Id: I8f2fe73d175e2ad2c4e380caede3873e0192d027


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4482ff23
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4482ff23
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4482ff23

Branch: refs/heads/master
Commit: 4482ff23ad984335b0d477100ac0815d5db8d532
Parents: cd5d0f3
Author: jerryshao 
Authored: Thu Aug 31 09:26:20 2017 +0800
Committer: jerryshao 
Committed: Thu Aug 31 09:26:20 2017 +0800

--
 .../spark/network/yarn/YarnShuffleService.java  | 82 ++--
 .../yarn/YarnShuffleIntegrationSuite.scala  | 33 ++--
 .../network/yarn/YarnShuffleServiceSuite.scala  | 32 +---
 3 files changed, 86 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4482ff23/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
--
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index cd67eb2..d8b2ed6 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -160,7 +161,9 @@ public class YarnShuffleService extends AuxiliaryService {
   // If we don't find one, then we choose a file to use to save the state 
next time.  Even if
   // an application was stopped while the NM was down, we expect yarn to 
call stopApplication()
   // when it comes back
-  registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
+  if (_recoveryPath != null) {
+registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
+  }
 
   TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(conf));
   blockHandler = new ExternalShuffleBlockHandler(transportConf, 
registeredExecutorFile);
@@ -170,7 +173,10 @@ public class YarnShuffleService extends AuxiliaryService {
   List bootstraps = Lists.newArrayList();
   boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, 
DEFAULT_SPARK_AUTHENTICATE);
   if (authEnabled) {
-createSecretManager();
+secretManager = new ShuffleSecretManager();
+if (_recoveryPath != null) {
+  loadSecretsFromDb();
+}
 bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
   }
 
@@ -194,13 +200,12 @@ public class YarnShuffleService extends AuxiliaryService {
 }
   }
 
-  private void createSecretManager() throws IOException {
-secretManager = new ShuffleSecretManager();
+  private void loadSecretsFromDb() throws IOException {
 secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
 
 // Make sure this is protected in case its not in the NM recovery dir
 FileSystem fs = FileSystem.getLocal(_conf);
-fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
+fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700));
 
 db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
 logger.info("Recovery location is: " + secretsFile.getPath());
@@ -317,10 +322,10 @@ public class YarnShuffleService 

spark git commit: [SPARK-11574][CORE] Add metrics StatsD sink

2017-08-30 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 313c6ca43 -> cd5d0f337


[SPARK-11574][CORE] Add metrics StatsD sink

This patch adds statsd sink to the current metrics system in spark core.

Author: Xiaofeng Lin 

Closes #9518 from xflin/statsd.

Change-Id: Ib8720e86223d4a650df53f51ceb963cd95b49a44


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd5d0f33
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd5d0f33
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd5d0f33

Branch: refs/heads/master
Commit: cd5d0f3379b1a9fa0940ffd98bfff33f8cbcdeb0
Parents: 313c6ca
Author: Xiaofeng Lin 
Authored: Thu Aug 31 08:57:15 2017 +0800
Committer: jerryshao 
Committed: Thu Aug 31 08:57:15 2017 +0800

--
 conf/metrics.properties.template|  12 ++
 .../spark/metrics/sink/StatsdReporter.scala | 163 +++
 .../apache/spark/metrics/sink/StatsdSink.scala  |  75 +
 .../spark/metrics/sink/StatsdSinkSuite.scala| 161 ++
 docs/monitoring.md  |   1 +
 5 files changed, 412 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/conf/metrics.properties.template
--
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index aeb76c9..4c008a1 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -118,6 +118,14 @@
 #   prefixEMPTY STRING  Prefix to prepend to every metric's name
 #   protocol  tcp   Protocol ("tcp" or "udp") to use
 
+# org.apache.spark.metrics.sink.StatsdSink
+#   Name: Default:  Description:
+#   host  127.0.0.1 Hostname or IP of StatsD server
+#   port  8125  Port of StatsD server
+#   period10Poll period
+#   unit  seconds   Units of poll period
+#   prefixEMPTY STRING  Prefix to prepend to metric name
+
 ## Examples
 # Enable JmxSink for all instances by class name
 #*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
@@ -125,6 +133,10 @@
 # Enable ConsoleSink for all instances by class name
 #*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
 
+# Enable StatsdSink for all instances by class name
+#*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
+#*.sink.statsd.prefix=spark
+
 # Polling period for the ConsoleSink
 #*.sink.console.period=10
 # Unit of the polling period for the ConsoleSink

http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
new file mode 100644
index 000..ba75aa1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.io.IOException
+import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.SortedMap
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import com.codahale.metrics._
+import org.apache.hadoop.net.NetUtils
+
+import org.apache.spark.internal.Logging
+
+/**
+ * @see https://github.com/etsy/statsd/blob/master/docs/metric_types.md;>
+ *StatsD metric types
+ */
+private[spark] object StatsdMetricType {
+  val COUNTER = "c"
+  val GAUGE = "g"
+  val TIMER = "ms"
+  val Set = "s"
+}
+
+private[spark] class StatsdReporter(
+registry: MetricRegistry,
+host: String = "127.0.0.1",
+port: Int = 8125,
+prefix: String = "",
+filter: MetricFilter = MetricFilter.ALL,
+rateUnit: TimeUnit = 

spark git commit: [SPARK-21875][BUILD] Fix Java style bugs

2017-08-30 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master d8f454086 -> 313c6ca43


[SPARK-21875][BUILD] Fix Java style bugs

## What changes were proposed in this pull request?

Fix Java code style so `./dev/lint-java` succeeds

## How was this patch tested?

Run `./dev/lint-java`

Author: Andrew Ash 

Closes #19088 from ash211/spark-21875-lint-java.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/313c6ca4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/313c6ca4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/313c6ca4

Branch: refs/heads/master
Commit: 313c6ca43593e247ab8cedac15c77d13e2830d6b
Parents: d8f4540
Author: Andrew Ash 
Authored: Thu Aug 31 09:26:11 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Aug 31 09:26:11 2017 +0900

--
 core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java | 3 ++-
 .../src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/313c6ca4/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
--
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 0f1e902..44b60c1 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -74,7 +74,8 @@ public class TaskMemoryManager {
* Maximum supported data page size (in bytes). In principle, the maximum 
addressable page size is
* (1L  OFFSET_BITS) bytes, which is 2+ petabytes. However, the 
on-heap allocator's
* maximum page size is limited by the maximum amount of data that can be 
stored in a long[]
-   * array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, 
we cap this at 17 gigabytes.
+   * array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, 
we cap this at 17
+   * gigabytes.
*/
   public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/313c6ca4/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
--
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 3e57403..13b006f 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -1337,7 +1337,8 @@ public class JavaDatasetSuite implements Serializable {
 public boolean equals(Object other) {
   if (other instanceof BeanWithEnum) {
 BeanWithEnum beanWithEnum = (BeanWithEnum) other;
-return beanWithEnum.regularField.equals(regularField) && 
beanWithEnum.enumField.equals(enumField);
+return beanWithEnum.regularField.equals(regularField)
+  && beanWithEnum.enumField.equals(enumField);
   }
   return false;
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21839][SQL] Support SQL config for ORC compression

2017-08-30 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 6949a9c5c -> d8f454086


[SPARK-21839][SQL] Support SQL config for ORC compression

## What changes were proposed in this pull request?

This PR aims to support `spark.sql.orc.compression.codec` like Parquet's 
`spark.sql.parquet.compression.codec`. Users can use SQLConf to control ORC 
compression, too.

## How was this patch tested?

Pass the Jenkins with new and updated test cases.

Author: Dongjoon Hyun 

Closes #19055 from dongjoon-hyun/SPARK-21839.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8f45408
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8f45408
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8f45408

Branch: refs/heads/master
Commit: d8f45408635d4fccac557cb1e877dfe9267fb326
Parents: 6949a9c
Author: Dongjoon Hyun 
Authored: Thu Aug 31 08:16:58 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Aug 31 08:16:58 2017 +0900

--
 python/pyspark/sql/readwriter.py|  5 ++--
 .../org/apache/spark/sql/internal/SQLConf.scala | 10 +++
 .../org/apache/spark/sql/DataFrameWriter.scala  |  8 --
 .../spark/sql/hive/orc/OrcFileFormat.scala  |  2 +-
 .../apache/spark/sql/hive/orc/OrcOptions.scala  | 18 +++-
 .../spark/sql/hive/orc/OrcSourceSuite.scala | 29 ++--
 6 files changed, 57 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 01da0dc..cb847a0 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -851,8 +851,9 @@ class DataFrameWriter(OptionUtils):
 :param partitionBy: names of partitioning columns
 :param compression: compression codec to use when saving to file. This 
can be one of the
 known case-insensitive shorten names (none, 
snappy, zlib, and lzo).
-This will override ``orc.compress``. If None is 
set, it uses the
-default value, ``snappy``.
+This will override ``orc.compress`` and
+``spark.sql.orc.compression.codec``. If None is 
set, it uses the value
+specified in ``spark.sql.orc.compression.codec``.
 
 >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
 >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))

http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a685099..c407874 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -322,6 +322,14 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
+.doc("Sets the compression codec use when writing ORC files. Acceptable 
values include: " +
+  "none, uncompressed, snappy, zlib, lzo.")
+.stringConf
+.transform(_.toLowerCase(Locale.ROOT))
+.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
+.createWithDefault("snappy")
+
   val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
 .doc("When true, enable filter pushdown for ORC files.")
 .booleanConf
@@ -998,6 +1006,8 @@ class SQLConf extends Serializable with Logging {
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)
 
+  def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
+
   def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
 
   def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)

http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index cca9352..07347d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -517,9 +517,11 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
*
 

spark git commit: [SPARK-21834] Incorrect executor request in case of dynamic allocation

2017-08-30 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 576975356 -> 041eccb4f


[SPARK-21834] Incorrect executor request in case of dynamic allocation

## What changes were proposed in this pull request?

killExecutor api currently does not allow killing an executor without updating 
the total number of executors needed. In case of dynamic allocation is turned 
on and the allocator tries to kill an executor, the scheduler reduces the total 
number of executors needed ( see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635)
 which is incorrect because the allocator already takes care of setting the 
required number of executors itself.

## How was this patch tested?

Ran a job on the cluster and made sure the executor request is correct

Author: Sital Kedia 

Closes #19081 from sitalkedia/skedia/oss_fix_executor_allocation.

(cherry picked from commit 6949a9c5c6120fdde1b63876ede661adbd1eb15e)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/041eccb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/041eccb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/041eccb4

Branch: refs/heads/branch-2.1
Commit: 041eccb4fa35a2778996c052dbcfd09f779b64a6
Parents: 5769753
Author: Sital Kedia 
Authored: Wed Aug 30 14:19:13 2017 -0700
Committer: Marcelo Vanzin 
Committed: Wed Aug 30 14:19:33 2017 -0700

--
 .../main/scala/org/apache/spark/ExecutorAllocationManager.scala   | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/041eccb4/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index f054a78..d25ab61 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -427,6 +427,9 @@ private[spark] class ExecutorAllocationManager(
 } else {
   client.killExecutors(executorIdsToBeRemoved)
 }
+// [SPARK-21834] killExecutors api reduces the target number of executors.
+// So we need to update the target with desired value.
+client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
 // reset the newExecutorTotal to the existing number of executors
 newExecutorTotal = numExistingExecutors
 if (testing || executorsRemoved.nonEmpty) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21834] Incorrect executor request in case of dynamic allocation

2017-08-30 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 d10c9dc3f -> 14054ffc5


[SPARK-21834] Incorrect executor request in case of dynamic allocation

## What changes were proposed in this pull request?

killExecutor api currently does not allow killing an executor without updating 
the total number of executors needed. In case of dynamic allocation is turned 
on and the allocator tries to kill an executor, the scheduler reduces the total 
number of executors needed ( see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635)
 which is incorrect because the allocator already takes care of setting the 
required number of executors itself.

## How was this patch tested?

Ran a job on the cluster and made sure the executor request is correct

Author: Sital Kedia 

Closes #19081 from sitalkedia/skedia/oss_fix_executor_allocation.

(cherry picked from commit 6949a9c5c6120fdde1b63876ede661adbd1eb15e)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14054ffc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14054ffc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14054ffc

Branch: refs/heads/branch-2.2
Commit: 14054ffc5fd3399d04d69e26efb31d8b24b60bdc
Parents: d10c9dc
Author: Sital Kedia 
Authored: Wed Aug 30 14:19:13 2017 -0700
Committer: Marcelo Vanzin 
Committed: Wed Aug 30 14:19:22 2017 -0700

--
 .../main/scala/org/apache/spark/ExecutorAllocationManager.scala   | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14054ffc/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bb5eb7f..632d5f2 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -430,6 +430,9 @@ private[spark] class ExecutorAllocationManager(
 } else {
   client.killExecutors(executorIdsToBeRemoved)
 }
+// [SPARK-21834] killExecutors api reduces the target number of executors.
+// So we need to update the target with desired value.
+client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
 // reset the newExecutorTotal to the existing number of executors
 newExecutorTotal = numExistingExecutors
 if (testing || executorsRemoved.nonEmpty) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21834] Incorrect executor request in case of dynamic allocation

2017-08-30 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 235d28333 -> 6949a9c5c


[SPARK-21834] Incorrect executor request in case of dynamic allocation

## What changes were proposed in this pull request?

killExecutor api currently does not allow killing an executor without updating 
the total number of executors needed. In case of dynamic allocation is turned 
on and the allocator tries to kill an executor, the scheduler reduces the total 
number of executors needed ( see 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635)
 which is incorrect because the allocator already takes care of setting the 
required number of executors itself.

## How was this patch tested?

Ran a job on the cluster and made sure the executor request is correct

Author: Sital Kedia 

Closes #19081 from sitalkedia/skedia/oss_fix_executor_allocation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6949a9c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6949a9c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6949a9c5

Branch: refs/heads/master
Commit: 6949a9c5c6120fdde1b63876ede661adbd1eb15e
Parents: 235d283
Author: Sital Kedia 
Authored: Wed Aug 30 14:19:13 2017 -0700
Committer: Marcelo Vanzin 
Committed: Wed Aug 30 14:19:13 2017 -0700

--
 .../main/scala/org/apache/spark/ExecutorAllocationManager.scala   | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6949a9c5/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 3350326..7a5fb9a 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -446,6 +446,9 @@ private[spark] class ExecutorAllocationManager(
 } else {
   client.killExecutors(executorIdsToBeRemoved)
 }
+// [SPARK-21834] killExecutors api reduces the target number of executors.
+// So we need to update the target with desired value.
+client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
 // reset the newExecutorTotal to the existing number of executors
 newExecutorTotal = numExistingExecutors
 if (testing || executorsRemoved.nonEmpty) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode

2017-08-30 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 a6a994414 -> d10c9dc3f


[SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in 
yarn client mode

## What changes were proposed in this pull request?

This is a backport PR to fix issue of re-uploading remote resource in yarn 
client mode. The original PR is #18962.

## How was this patch tested?

Tested in local UT.

Author: jerryshao 

Closes #19074 from jerryshao/SPARK-21714-2.2-backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d10c9dc3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d10c9dc3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d10c9dc3

Branch: refs/heads/branch-2.2
Commit: d10c9dc3f631a26dbbbd8f5c601ca2001a5d7c80
Parents: a6a9944
Author: jerryshao 
Authored: Wed Aug 30 12:30:24 2017 -0700
Committer: Marcelo Vanzin 
Committed: Wed Aug 30 12:30:24 2017 -0700

--
 .../org/apache/spark/deploy/SparkSubmit.scala   | 66 ---
 .../apache/spark/internal/config/package.scala  |  2 +-
 .../scala/org/apache/spark/util/Utils.scala | 25 ---
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 68 
 .../org/apache/spark/repl/SparkILoop.scala  |  2 +-
 .../main/scala/org/apache/spark/repl/Main.scala |  2 +-
 6 files changed, 116 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d10c9dc3/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c60a2a1..86d578e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -208,14 +208,20 @@ object SparkSubmit extends CommandLineUtils {
 
   /**
* Prepare the environment for submitting an application.
-   * This returns a 4-tuple:
-   *   (1) the arguments for the child process,
-   *   (2) a list of classpath entries for the child,
-   *   (3) a map of system properties, and
-   *   (4) the main class for the child
+   *
+   * @param args the parsed SparkSubmitArguments used for environment 
preparation.
+   * @param conf the Hadoop Configuration, this argument will only be set in 
unit test.
+   * @return a 4-tuple:
+   *(1) the arguments for the child process,
+   *(2) a list of classpath entries for the child,
+   *(3) a map of system properties, and
+   *(4) the main class for the child
+   *
* Exposed for testing.
*/
-  private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
+  private[deploy] def prepareSubmitEnvironment(
+  args: SparkSubmitArguments,
+  conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], Map[String, String], String) = {
 // Return values
 val childArgs = new ArrayBuffer[String]()
@@ -311,12 +317,16 @@ object SparkSubmit extends CommandLineUtils {
 }
 
 // In client mode, download remote files.
+var localPrimaryResource: String = null
+var localJars: String = null
+var localPyFiles: String = null
+var localFiles: String = null
 if (deployMode == CLIENT) {
-  val hadoopConf = new HadoopConfiguration()
-  args.primaryResource = Option(args.primaryResource).map(downloadFile(_, 
hadoopConf)).orNull
-  args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
-  args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, 
hadoopConf)).orNull
-  args.files = Option(args.files).map(downloadFileList(_, 
hadoopConf)).orNull
+  val hadoopConf = conf.getOrElse(new HadoopConfiguration())
+  localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, 
hadoopConf)).orNull
+  localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
+  localPyFiles = Option(args.pyFiles).map(downloadFileList(_, 
hadoopConf)).orNull
+  localFiles = Option(args.files).map(downloadFileList(_, 
hadoopConf)).orNull
 }
 
 // Require all python files to be local, so we can add them to the 
PYTHONPATH
@@ -366,7 +376,7 @@ object SparkSubmit extends CommandLineUtils {
 // If a python file is provided, add it to the child arguments and 
list of files to deploy.
 // Usage: PythonAppRunner   [app 
arguments]
 args.mainClass = "org.apache.spark.deploy.PythonRunner"
-args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ 
args.childArgs
+args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ 
args.childArgs
 if (clusterManager != 

spark git commit: [MINOR][SQL][TEST] Test shuffle hash join while is not expected

2017-08-30 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 32d6d9d72 -> 235d28333


[MINOR][SQL][TEST] Test shuffle hash join while is not expected

## What changes were proposed in this pull request?

igore("shuffle hash join") is to shuffle hash join to test _case class 
ShuffledHashJoinExec_.
But when you 'ignore' -> 'test', the test is _case class BroadcastHashJoinExec_.

Before modified,  as a result of:canBroadcast is true.
Print information in _canBroadcast(plan: LogicalPlan)_
```
canBroadcast plan.stats.sizeInBytes:6710880
canBroadcast conf.autoBroadcastJoinThreshold:1000
```

After modified, plan.stats.sizeInBytes is 11184808.
Print information in _canBuildLocalHashMap(plan: LogicalPlan)_
and _muchSmaller(a: LogicalPlan, b: LogicalPlan)_ :

```
canBuildLocalHashMap plan.stats.sizeInBytes:11184808
canBuildLocalHashMap conf.autoBroadcastJoinThreshold:1000
canBuildLocalHashMap conf.numShufflePartitions:2
```
```
muchSmaller a.stats.sizeInBytes * 3:33554424
muchSmaller b.stats.sizeInBytes:33554432
```
## How was this patch tested?

existing test case.

Author: caoxuewen 

Closes #19069 from heary-cao/shuffle_hash_join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/235d2833
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/235d2833
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/235d2833

Branch: refs/heads/master
Commit: 235d28333c63719008ee755138db5c964237f526
Parents: 32d6d9d
Author: caoxuewen 
Authored: Wed Aug 30 10:10:24 2017 -0700
Committer: gatorsmile 
Committed: Wed Aug 30 10:10:24 2017 -0700

--
 .../sql/execution/benchmark/JoinBenchmark.scala | 56 +---
 1 file changed, 37 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/235d2833/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
index 46db41a..5a25d72 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.benchmark
 
+import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.IntegerType
 
@@ -35,7 +36,9 @@ class JoinBenchmark extends BenchmarkBase {
 
 val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
 runBenchmark("Join w long", N) {
-  sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count()
+  val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
+  
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+  df.count()
 }
 
 /*
@@ -55,7 +58,9 @@ class JoinBenchmark extends BenchmarkBase {
 val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
 runBenchmark("Join w long duplicated", N) {
   val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as 
long) as k"))
-  sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count()
+  val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
+  
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+  df.count()
 }
 
 /*
@@ -75,9 +80,11 @@ class JoinBenchmark extends BenchmarkBase {
   .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id 
as string) as v"))
 
 runBenchmark("Join w 2 ints", N) {
-  sparkSession.range(N).join(dim2,
+  val df = sparkSession.range(N).join(dim2,
 (col("id") % M).cast(IntegerType) === col("k1")
-  && (col("id") % M).cast(IntegerType) === col("k2")).count()
+  && (col("id") % M).cast(IntegerType) === col("k2"))
+  
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+  df.count()
 }
 
 /*
@@ -97,9 +104,10 @@ class JoinBenchmark extends BenchmarkBase {
   .selectExpr("id as k1", "id as k2", "cast(id as string) as v"))
 
 runBenchmark("Join w 2 longs", N) {
-  sparkSession.range(N).join(dim3,
+  val df = sparkSession.range(N).join(dim3,
 (col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
-.count()
+  
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+  df.count()
 }
 
 /*
@@ -119,9 +127,10 @@ 

spark git commit: Revert "[SPARK-21845][SQL] Make codegen fallback of expressions configurable"

2017-08-30 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 4133c1b0a -> 32d6d9d72


Revert "[SPARK-21845][SQL] Make codegen fallback of expressions configurable"

This reverts commit 3d0e174244bc293f11dff0f11ef705ba6cd5fe3a.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32d6d9d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32d6d9d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32d6d9d7

Branch: refs/heads/master
Commit: 32d6d9d72019404ebd47f6aa64197d9f574bac8b
Parents: 4133c1b
Author: gatorsmile 
Authored: Wed Aug 30 09:08:40 2017 -0700
Committer: gatorsmile 
Committed: Wed Aug 30 09:08:40 2017 -0700

--
 .../org/apache/spark/sql/internal/SQLConf.scala  |  6 +++---
 .../org/apache/spark/sql/execution/SparkPlan.scala   | 15 ++-
 .../spark/sql/execution/WholeStageCodegenExec.scala  |  2 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala   |  2 +-
 .../scala/org/apache/spark/sql/DataFrameSuite.scala  | 12 +---
 .../org/apache/spark/sql/test/SharedSQLContext.scala |  2 --
 .../org/apache/spark/sql/hive/test/TestHive.scala|  1 -
 7 files changed, 16 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 24f51ef..a685099 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -551,9 +551,9 @@ object SQLConf {
 .intConf
 .createWithDefault(100)
 
-  val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
+  val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback")
 .internal()
-.doc("When true, (whole stage) codegen could be temporary disabled for the 
part of query that" +
+.doc("When true, whole stage codegen could be temporary disabled for the 
part of query that" +
   " fail to compile generated code")
 .booleanConf
 .createWithDefault(true)
@@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging {
 
   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
 
-  def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
+  def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
 
   def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index b1db9dd..c7277c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -56,10 +56,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
 
   protected def sparkContext = sqlContext.sparkContext
 
-  // whether we should fallback when hitting compilation errors caused by 
codegen
-  private val codeGenFallBack = sqlContext.conf.codegenFallback
-
-  protected val subexpressionEliminationEnabled = 
sqlContext.conf.subexpressionEliminationEnabled
+  // sqlContext will be null when we are being deserialized on the slaves.  In 
this instance
+  // the value of subexpressionEliminationEnabled will be set by the 
deserializer after the
+  // constructor has run.
+  val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
+sqlContext.conf.subexpressionEliminationEnabled
+  } else {
+false
+  }
 
   /** Overridden make copy also propagates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
@@ -366,7 +370,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
 try {
   GeneratePredicate.generate(expression, inputSchema)
 } catch {
-  case _ @ (_: JaninoRuntimeException | _: CompileException) if 
codeGenFallBack =>
+  case e @ (_: JaninoRuntimeException | _: CompileException)
+  if sqlContext == null || sqlContext.conf.wholeStageFallback =>
 genInterpretedPredicate(expression, inputSchema)
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
--

spark git commit: [SPARK-21469][ML][EXAMPLES] Adding Examples for FeatureHasher

2017-08-30 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/master b30a11a6a -> 4133c1b0a


[SPARK-21469][ML][EXAMPLES] Adding Examples for FeatureHasher

## What changes were proposed in this pull request?

This PR adds ML examples for the FeatureHasher transform in Scala, Java, Python.

## How was this patch tested?

Manually ran examples and verified that output is consistent for different APIs

Author: Bryan Cutler 

Closes #19024 from BryanCutler/ml-examples-FeatureHasher-SPARK-21810.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4133c1b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4133c1b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4133c1b0

Branch: refs/heads/master
Commit: 4133c1b0abb22f728fbff287f4f77a06ab88bbe8
Parents: b30a11a
Author: Bryan Cutler 
Authored: Wed Aug 30 16:00:29 2017 +0200
Committer: Nick Pentreath 
Committed: Wed Aug 30 16:00:29 2017 +0200

--
 docs/ml-features.md | 91 +++-
 .../examples/ml/JavaFeatureHasherExample.java   | 69 +++
 .../main/python/ml/feature_hasher_example.py| 46 ++
 .../examples/ml/FeatureHasherExample.scala  | 50 +++
 .../apache/spark/ml/feature/FeatureHasher.scala |  7 +-
 5 files changed, 256 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4133c1b0/docs/ml-features.md
--
diff --git a/docs/ml-features.md b/docs/ml-features.md
index e19fba2..86a0e09 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -53,9 +53,9 @@ are calculated based on the mapped indices. This approach 
avoids the need to com
 term-to-index map, which can be expensive for a large corpus, but it suffers 
from potential hash 
 collisions, where different raw features may become the same term after 
hashing. To reduce the 
 chance of collision, we can increase the target feature dimension, i.e. the 
number of buckets 
-of the hash table. Since a simple modulo is used to transform the hash 
function to a column index, 
-it is advisable to use a power of two as the feature dimension, otherwise the 
features will 
-not be mapped evenly to the columns. The default feature dimension is `$2^{18} 
= 262,144$`.
+of the hash table. Since a simple modulo on the hashed value is used to 
determine the vector index,
+it is advisable to use a power of two as the feature dimension, otherwise the 
features will not
+be mapped evenly to the vector indices. The default feature dimension is 
`$2^{18} = 262,144$`.
 An optional binary toggle parameter controls term frequency counts. When set 
to true all nonzero
 frequency counts are set to 1. This is especially useful for discrete 
probabilistic models that
 model binary, rather than integer, counts.
@@ -65,7 +65,7 @@ model binary, rather than integer, counts.
 
 **IDF**: `IDF` is an `Estimator` which is fit on a dataset and produces an 
`IDFModel`.  The 
 `IDFModel` takes feature vectors (generally created from `HashingTF` or 
`CountVectorizer`) and 
-scales each column. Intuitively, it down-weights columns which appear 
frequently in a corpus.
+scales each feature. Intuitively, it down-weights features which appear 
frequently in a corpus.
 
 **Note:** `spark.ml` doesn't provide tools for text segmentation.
 We refer users to the [Stanford NLP Group](http://nlp.stanford.edu/) and 
@@ -211,6 +211,89 @@ for more details on the API.
 
 
 
+## FeatureHasher
+
+Feature hashing projects a set of categorical or numerical features into a 
feature vector of
+specified dimension (typically substantially smaller than that of the original 
feature
+space). This is done using the [hashing 
trick](https://en.wikipedia.org/wiki/Feature_hashing)
+to map features to indices in the feature vector.
+
+The `FeatureHasher` transformer operates on multiple columns. Each column may 
contain either
+numeric or categorical features. Behavior and handling of column data types is 
as follows:
+
+- Numeric columns: For numeric features, the hash value of the column name is 
used to map the
+feature value to its index in the feature vector. Numeric features are never 
treated as
+categorical, even when they are integers. You must explicitly convert numeric 
columns containing
+categorical features to strings first.
+- String columns: For categorical features, the hash value of the string 
"column_name=value"
+is used to map to the vector index, with an indicator value of `1.0`. Thus, 
categorical features
+are "one-hot" encoded (similarly to using 
[OneHotEncoder](ml-features.html#onehotencoder) with
+`dropLast=false`).
+- Boolean columns: Boolean values are treated in the same way as string 
columns. That is,
+boolean 

spark git commit: [SPARK-21764][TESTS] Fix tests failures on Windows: resources not being closed and incorrect paths

2017-08-30 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 734ed7a7b -> b30a11a6a


[SPARK-21764][TESTS] Fix tests failures on Windows: resources not being closed 
and incorrect paths

## What changes were proposed in this pull request?

`org.apache.spark.deploy.RPackageUtilsSuite`

```
 - jars without manifest return false *** FAILED *** (109 milliseconds)
   java.io.IOException: Unable to delete file: 
C:\projects\spark\target\tmp\1500266936418-0\dep1-c.jar
```

`org.apache.spark.deploy.SparkSubmitSuite`

```
 - download one file to local *** FAILED *** (16 milliseconds)
   java.net.URISyntaxException: Illegal character in authority at index 6: 
s3a://C:\projects\spark\target\tmp\test2630198944759847458.jar

 - download list of files to local *** FAILED *** (0 milliseconds)
   java.net.URISyntaxException: Illegal character in authority at index 6: 
s3a://C:\projects\spark\target\tmp\test2783551769392880031.jar
```

`org.apache.spark.scheduler.ReplayListenerSuite`

```
 - Replay compressed inprogress log file succeeding on partial read (156 
milliseconds)
   Exception encountered when attempting to run a suite with class name:
   org.apache.spark.scheduler.ReplayListenerSuite *** ABORTED *** (1 second, 
391 milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-8f3cacd6-faad-4121-b901-ba1bba8025a0

 - End-to-end replay *** FAILED *** (62 milliseconds)
   java.io.IOException: No FileSystem for scheme: C

 - End-to-end replay with compression *** FAILED *** (110 milliseconds)
   java.io.IOException: No FileSystem for scheme: C
```

`org.apache.spark.sql.hive.StatisticsSuite`

```
 - SPARK-21079 - analyze table with location different than that of individual 
partitions *** FAILED *** (875 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - SPARK-21079 - analyze partitioned table with only a subset of partitions 
visible *** FAILED *** (47 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);
```

**Note:** this PR does not fix:

`org.apache.spark.deploy.SparkSubmitSuite`

```
 - launch simple application with spark-submit with redaction *** FAILED *** 
(172 milliseconds)
   java.util.NoSuchElementException: next on empty iterator
```

I can't reproduce this on my Windows machine but looks appearntly consistently 
failed on AppVeyor. This one is unclear to me yet and hard to debug so I did 
not include this one for now.

**Note:** it looks there are more instances but it is hard to identify them 
partly due to flakiness and partly due to swarming logs and errors. Will 
probably go one more time if it is fine.

## How was this patch tested?

Manually via AppVeyor:

**Before**

- `org.apache.spark.deploy.RPackageUtilsSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/8t8ra3lrljuir7q4
- `org.apache.spark.deploy.SparkSubmitSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/taquy84yudjjen64
- `org.apache.spark.scheduler.ReplayListenerSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/24omrfn2k0xfa9xq
- `org.apache.spark.sql.hive.StatisticsSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/2079y1plgj76dc9l

**After**

- `org.apache.spark.deploy.RPackageUtilsSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/3803dbfn89ne1164
- `org.apache.spark.deploy.SparkSubmitSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/m5l350dp7u9a4xjr
- `org.apache.spark.scheduler.ReplayListenerSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/565vf74pp6bfdk18
- `org.apache.spark.sql.hive.StatisticsSuite`: 
https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/qm78tsk8c37jb6s4

Jenkins tests are required and AppVeyor tests will be triggered.

Author: hyukjinkwon 

Closes #18971 from HyukjinKwon/windows-fixes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b30a11a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b30a11a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b30a11a6

Branch: refs/heads/master
Commit: b30a11a6acf4b1512b5759f21ae58e69662ba455
Parents: 734ed7a
Author: hyukjinkwon 
Authored: Wed Aug 30 21:35:52 2017 +0900
Committer: hyukjinkwon 
Committed: Wed Aug 30 21:35:52 2017 +0900

--
 .../spark/deploy/RPackageUtilsSuite.scala   |  7 +--
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  4 +-
 

spark git commit: [SPARK-21806][MLLIB] BinaryClassificationMetrics pr(): first point (0.0, 1.0) is misleading

2017-08-30 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 8f0df6bc1 -> 734ed7a7b


[SPARK-21806][MLLIB] BinaryClassificationMetrics pr(): first point (0.0, 1.0) 
is misleading

## What changes were proposed in this pull request?

Prepend (0,p) to precision-recall curve not (0,1) where p matches lowest recall 
point

## How was this patch tested?

Updated tests.

Author: Sean Owen 

Closes #19038 from srowen/SPARK-21806.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/734ed7a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/734ed7a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/734ed7a7

Branch: refs/heads/master
Commit: 734ed7a7b397578f16549070f350215bde369b3c
Parents: 8f0df6b
Author: Sean Owen 
Authored: Wed Aug 30 11:36:00 2017 +0100
Committer: Sean Owen 
Committed: Wed Aug 30 11:36:00 2017 +0100

--
 .../BinaryClassificationMetrics.scala   |  8 +++
 .../BinaryClassificationMetricsSuite.scala  | 22 +---
 2 files changed, 14 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/734ed7a7/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
index 9b7cd04..2cfcf38 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
@@ -98,16 +98,16 @@ class BinaryClassificationMetrics @Since("1.3.0") (
 
   /**
* Returns the precision-recall curve, which is an RDD of (recall, 
precision),
-   * NOT (precision, recall), with (0.0, 1.0) prepended to it.
+   * NOT (precision, recall), with (0.0, p) prepended to it, where p is the 
precision
+   * associated with the lowest recall on the curve.
* @see http://en.wikipedia.org/wiki/Precision_and_recall;>
* Precision and recall (Wikipedia)
*/
   @Since("1.0.0")
   def pr(): RDD[(Double, Double)] = {
 val prCurve = createCurve(Recall, Precision)
-val sc = confusions.context
-val first = sc.makeRDD(Seq((0.0, 1.0)), 1)
-first.union(prCurve)
+val (_, firstPrecision) = prCurve.first()
+confusions.context.parallelize(Seq((0.0, firstPrecision)), 
1).union(prCurve)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/734ed7a7/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala
index 99d52fa..a08917a 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala
@@ -23,18 +23,16 @@ import org.apache.spark.mllib.util.TestingUtils._
 
 class BinaryClassificationMetricsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
-  private def areWithinEpsilon(x: (Double, Double)): Boolean = x._1 ~= (x._2) 
absTol 1E-5
-
-  private def pairsWithinEpsilon(x: ((Double, Double), (Double, Double))): 
Boolean =
-(x._1._1 ~= x._2._1 absTol 1E-5) && (x._1._2 ~= x._2._2 absTol 1E-5)
-
-  private def assertSequencesMatch(left: Seq[Double], right: Seq[Double]): 
Unit = {
-  assert(left.zip(right).forall(areWithinEpsilon))
+  private def assertSequencesMatch(actual: Seq[Double], expected: 
Seq[Double]): Unit = {
+actual.zip(expected).foreach { case (a, e) => assert(a ~== e absTol 
1.0e-5) }
   }
 
-  private def assertTupleSequencesMatch(left: Seq[(Double, Double)],
-   right: Seq[(Double, Double)]): Unit = {
-assert(left.zip(right).forall(pairsWithinEpsilon))
+  private def assertTupleSequencesMatch(actual: Seq[(Double, Double)],
+   expected: Seq[(Double, Double)]): Unit = {
+actual.zip(expected).foreach { case ((ax, ay), (ex, ey)) =>
+  assert(ax ~== ex absTol 1.0e-5)
+  assert(ay ~== ey absTol 1.0e-5)
+}
   }
 
   private def validateMetrics(metrics: BinaryClassificationMetrics,
@@ -44,7 +42,7 @@ class BinaryClassificationMetricsSuite extends SparkFunSuite 
with MLlibTestSpark
   expectedFMeasures1: Seq[Double],
   expectedFmeasures2: Seq[Double],
   expectedPrecisions: Seq[Double],
-  expectedRecalls: Seq[Double]) = {
+  expectedRecalls: 

spark git commit: [SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get`

2017-08-30 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master d4895c9de -> 8f0df6bc1


[SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get`

During profiling of a structured streaming application with Kafka as the 
source, I came across this exception:

![Structured Streaming Kafka 
Exceptions](https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png)

This is a 1 minute sample, which caused 106K `NonLocalReturnControl` exceptions 
to be thrown.
This happens because `CachedKafkaConsumer.get` is ran inside:

`private def runUninterruptiblyIfPossible[T](body: => T): T`

Where `body: => T` is the `get` method. Turning the method into a function 
means that in order to escape the `while` loop defined in `get` the runtime has 
to do dirty tricks which involve throwing the above exception.

## What changes were proposed in this pull request?

Instead of using `return` (which is generally not recommended in Scala), we 
place the result of the `fetchData` method inside a local variable and use a 
boolean flag to indicate the status of fetching data, which we monitor as our 
predicate to the `while` loop.

## How was this patch tested?

I've ran the `KafkaSourceSuite` to make sure regression passes. Since the 
exception isn't visible from user code, there is no way (at least that I could 
think of) to add this as a test to the existing suite.

Author: Yuval Itzchakov 

Closes #19059 from YuvalItzchakov/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0df6bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0df6bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0df6bc

Branch: refs/heads/master
Commit: 8f0df6bc1092c0c75b41e91e4ffc41a5525c8274
Parents: d4895c9
Author: Yuval Itzchakov 
Authored: Wed Aug 30 10:33:23 2017 +0100
Committer: Sean Owen 
Committed: Wed Aug 30 10:33:23 2017 +0100

--
 .../spark/sql/kafka010/CachedKafkaConsumer.scala | 19 +++
 1 file changed, 15 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8f0df6bc/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index 7c4f38e..90ed7b1 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -112,9 +112,15 @@ private[kafka010] case class CachedKafkaConsumer private(
 // we will move to the next available offset within `[offset, 
untilOffset)` and retry.
 // If `failOnDataLoss` is `true`, the loop body will be executed only once.
 var toFetchOffset = offset
-while (toFetchOffset != UNKNOWN_OFFSET) {
+var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
+// We want to break out of the while loop on a successful fetch to avoid 
using "return"
+// which may causes a NonLocalReturnControl exception when this method is 
used as a function.
+var isFetchComplete = false
+
+while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
   try {
-return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+isFetchComplete = true
   } catch {
 case e: OffsetOutOfRangeException =>
   // When there is some error thrown, it's better to use a new 
consumer to drop all cached
@@ -125,8 +131,13 @@ private[kafka010] case class CachedKafkaConsumer private(
   toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, 
untilOffset)
   }
 }
-resetFetchedData()
-null
+
+if (isFetchComplete) {
+  consumerRecord
+} else {
+  resetFetchedData()
+  null
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][TEST] Off -heap memory leaks for unit tests

2017-08-30 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master e47f48c73 -> d4895c9de


[MINOR][TEST] Off -heap memory leaks for unit tests

## What changes were proposed in this pull request?
Free off -heap memory .
I have checked all the unit tests.

## How was this patch tested?
N/A

Author: liuxian 

Closes #19075 from 10110346/memleak.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4895c9d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4895c9d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4895c9d

Branch: refs/heads/master
Commit: d4895c9de6ca9c3ac4461cf6f86cd88eb63e0720
Parents: e47f48c
Author: liuxian 
Authored: Wed Aug 30 10:16:11 2017 +0100
Committer: Sean Owen 
Committed: Wed Aug 30 10:16:11 2017 +0100

--
 .../src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java| 1 +
 .../test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java   | 1 +
 2 files changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d4895c9d/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
--
diff --git 
a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java 
b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
index a77ba82..4ae49d8 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -73,5 +73,6 @@ public class PlatformUtilSuite {
 Assert.assertEquals(
   Platform.getByte(offheap.getBaseObject(), offheap.getBaseOffset()),
   MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
+MemoryAllocator.UNSAFE.free(offheap);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d4895c9d/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
--
diff --git 
a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java 
b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index f53bc0b..46b0516 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -54,6 +54,7 @@ public class TaskMemoryManagerSuite {
 final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 
offset);
 Assert.assertEquals(null, manager.getPage(encodedAddress));
 Assert.assertEquals(offset, manager.getOffsetInPage(encodedAddress));
+manager.freePage(dataPage, c);
   }
 
   @Test


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21254][WEBUI] History UI performance fixes

2017-08-30 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 917fe6635 -> a6a994414


[SPARK-21254][WEBUI] History UI performance fixes

## This is a backport of PR #18783 to the latest released branch 2.2.

## What changes were proposed in this pull request?

As described in JIRA ticket, History page is taking ~1min to load for cases 
when amount of jobs is 10k+.
Most of the time is currently being spent on DOM manipulations and all 
additional costs implied by this (browser repaints and reflows).
PR's goal is not to change any behavior but to optimize time of History UI 
rendering:

1. The most costly operation is setting `innerHTML` for `duration` column 
within a loop, which is [extremely 
unperformant](https://jsperf.com/jquery-append-vs-html-list-performance/24). 
[Refactoring 
](https://github.com/criteo-forks/spark/commit/b7e56eef4d66af977bd05af58a81e14faf33c211)
 this helped to get page load time **down to 10-15s**

2. Second big gain bringing page load time **down to 4s** was [was 
achieved](https://github.com/criteo-forks/spark/commit/3630ca212baa94d60c5fe7e4109cf6da26288cec)
 by detaching table's DOM before parsing it with DataTables jQuery plugin.

3. Another chunk of improvements 
([1]https://github.com/criteo-forks/spark/commit/ab520d156a7293a707aa6bc053a2f83b9ac2),
 
[2](https://github.com/criteo-forks/spark/commit/e25be9a66b018ba0cc53884f242469b515cb2bf4),
 
[3](https://github.com/criteo-forks/spark/commit/91697079a29138b7581e64f2aa79247fa1a4e4af))
 was focused on removing unnecessary DOM manipulations that in total 
contributed ~250ms to page load time.

## How was this patch tested?

Tested by existing Selenium tests in 
`org.apache.spark.deploy.history.HistoryServerSuite`.

Changes were also tested on Criteo's spark-2.1 fork with 20k+ number of rows in 
the table, reducing load time to 4s.

Author: Dmitry Parfenchik 

Closes #18860 from 2ooom/history-ui-perf-fix-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6a99441
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6a99441
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6a99441

Branch: refs/heads/branch-2.2
Commit: a6a9944140bbb336146d0d868429cb01839375c7
Parents: 917fe66
Author: Dmitry Parfenchik 
Authored: Wed Aug 30 09:42:15 2017 +0100
Committer: Sean Owen 
Committed: Wed Aug 30 09:42:15 2017 +0100

--
 .../spark/ui/static/historypage-template.html   |  22 ++--
 .../org/apache/spark/ui/static/historypage.js   | 112 +--
 2 files changed, 71 insertions(+), 63 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6a99441/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html
--
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html 
b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html
index bfe31aa..20cd7bf 100644
--- 
a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html
+++ 
b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html
@@ -29,21 +29,25 @@
   App Name
 
   
-  
+  {{#hasMultipleAttempts}}
+  
 
   Attempt ID
 
   
+  {{/hasMultipleAttempts}}
   
 
   Started
 
   
-  
+  {{#showCompletedColumn}}
+  
 
   Completed
 
   
+  {{/showCompletedColumn}}
   
 
   Duration
@@ -68,13 +72,17 @@
   
   {{#applications}}
 
-  {{id}}
-  {{name}}
+  {{id}}
+  {{name}}
   {{#attempts}}
-  {{attemptId}}
+  {{#hasMultipleAttempts}}
+  {{attemptId}}
+  {{/hasMultipleAttempts}}
   {{startTime}}
-  {{endTime}}
-  {{duration}}
+  {{#showCompletedColumn}}
+  {{endTime}}
+  {{/showCompletedColumn}}
+  {{duration}}
   {{sparkUser}}
   {{lastUpdated}}
   Download

http://git-wip-us.apache.org/repos/asf/spark/blob/a6a99441/core/src/main/resources/org/apache/spark/ui/static/historypage.js
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js 
b/core/src/main/resources/org/apache/spark/ui/static/historypage.js
index 5ec1ce1..3e2bba8 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js
@@ -48,6 +48,18 @@ function getParameterByName(name, searchString) {
   return results === null ? "" : decodeURIComponent(results[1].replace(/\+/g, 
" "));
 }
 
+function removeColumnByName(columns, columnName) {
+