[spark] branch branch-2.4 updated: [SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record count

2019-04-26 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ba9e12d  [SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 
0 record count
ba9e12d is described below

commit ba9e12d55d043d6331df59a3829d40e41a9e2171
Author: Branden Smith 
AuthorDate: Wed Feb 6 13:55:19 2019 +0800

[SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record count

This PR consists of the `test` components of #23665 only, minus the 
associated patch from that PR.

It adds a new unit test to `JsonSuite` which verifies that the `count()` 
returned from a `DataFrame` loaded from JSON containing empty lines does not 
include those empty lines in the record count. The test runs `count` prior to 
otherwise reading data from the `DataFrame`, so as to catch future cases where 
a pre-parsing optimization might result in `count` results inconsistent with 
existing behavior.

This PR is intended to be deployed alongside #23667; `master` currently 
causes the test to fail, as described in 
[SPARK-26745](https://issues.apache.org/jira/browse/SPARK-26745).

Manual testing, existing `JsonSuite` unit tests.

Closes #23674 from sumitsu/json_emptyline_count_test.

Authored-by: Branden Smith 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 63bced9375ec1ec6ded220d768cd746050861a09)
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/execution/datasources/json/JsonSuite.scala | 12 
 1 file changed, 12 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3e4cc8f..5ca430a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2515,4 +2515,16 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
 checkCount(2)
 countForMalformedJSON(0, Seq(""))
   }
+
+  test("SPARK-26745: count() for non-multiline input with empty lines") {
+withTempPath { tempPath =>
+  val path = tempPath.getCanonicalPath
+  Seq("""{ "a" : 1 }""", "", """ { "a" : 2 }""", " \t ")
+.toDS()
+.repartition(1)
+.write
+.text(path)
+  assert(spark.read.json(path).count() === 2)
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-25535][CORE][BRANCH-2.4] Work around bad error handling in commons-crypto.

2019-04-26 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new fce9b2b  [SPARK-25535][CORE][BRANCH-2.4] Work around bad error 
handling in commons-crypto.
fce9b2b is described below

commit fce9b2bce647f7554cacd1245cf670ff938f84f7
Author: Marcelo Vanzin 
AuthorDate: Fri Apr 26 21:23:17 2019 -0700

[SPARK-25535][CORE][BRANCH-2.4] Work around bad error handling in 
commons-crypto.

The commons-crypto library does some questionable error handling internally,
which can lead to JVM crashes if some call into native code fails and cleans
up state it should not.

While the library is not fixed, this change adds some workarounds in Spark 
code
so that when an error is detected in the commons-crypto side, Spark avoids
calling into the library further.

Tested with existing and added unit tests.

Closes #24476 from vanzin/SPARK-25535-2.4.

Authored-by: Marcelo Vanzin 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/network/crypto/AuthEngine.java|  95 +-
 .../spark/network/crypto/TransportCipher.java  |  60 +++--
 .../spark/network/crypto/AuthEngineSuite.java  |  17 +++
 .../apache/spark/security/CryptoStreamUtils.scala  | 137 +++--
 .../spark/security/CryptoStreamUtilsSuite.scala|  37 +-
 5 files changed, 295 insertions(+), 51 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java
 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java
index 056505e..64fdb32 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java
@@ -159,15 +159,21 @@ class AuthEngine implements Closeable {
 // accurately report the errors when they happen.
 RuntimeException error = null;
 byte[] dummy = new byte[8];
-try {
-  doCipherOp(encryptor, dummy, true);
-} catch (Exception e) {
-  error = new RuntimeException(e);
+if (encryptor != null) {
+  try {
+doCipherOp(Cipher.ENCRYPT_MODE, dummy, true);
+  } catch (Exception e) {
+error = new RuntimeException(e);
+  }
+  encryptor = null;
 }
-try {
-  doCipherOp(decryptor, dummy, true);
-} catch (Exception e) {
-  error = new RuntimeException(e);
+if (decryptor != null) {
+  try {
+doCipherOp(Cipher.DECRYPT_MODE, dummy, true);
+  } catch (Exception e) {
+error = new RuntimeException(e);
+  }
+  decryptor = null;
 }
 random.close();
 
@@ -189,11 +195,11 @@ class AuthEngine implements Closeable {
   }
 
   private byte[] decrypt(byte[] in) throws GeneralSecurityException {
-return doCipherOp(decryptor, in, false);
+return doCipherOp(Cipher.DECRYPT_MODE, in, false);
   }
 
   private byte[] encrypt(byte[] in) throws GeneralSecurityException {
-return doCipherOp(encryptor, in, false);
+return doCipherOp(Cipher.ENCRYPT_MODE, in, false);
   }
 
   private void initializeForAuth(String cipher, byte[] nonce, SecretKeySpec 
key)
@@ -205,11 +211,13 @@ class AuthEngine implements Closeable {
 byte[] iv = new byte[conf.ivLength()];
 System.arraycopy(nonce, 0, iv, 0, Math.min(nonce.length, iv.length));
 
-encryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf);
-encryptor.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(iv));
+CryptoCipher _encryptor = CryptoCipherFactory.getCryptoCipher(cipher, 
cryptoConf);
+_encryptor.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(iv));
+this.encryptor = _encryptor;
 
-decryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf);
-decryptor.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv));
+CryptoCipher _decryptor = CryptoCipherFactory.getCryptoCipher(cipher, 
cryptoConf);
+_decryptor.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv));
+this.decryptor = _decryptor;
   }
 
   /**
@@ -241,29 +249,52 @@ class AuthEngine implements Closeable {
 return new SecretKeySpec(key.getEncoded(), conf.keyAlgorithm());
   }
 
-  private byte[] doCipherOp(CryptoCipher cipher, byte[] in, boolean isFinal)
+  private byte[] doCipherOp(int mode, byte[] in, boolean isFinal)
 throws GeneralSecurityException {
 
-Preconditions.checkState(cipher != null);
+CryptoCipher cipher;
+switch (mode) {
+  case Cipher.ENCRYPT_MODE:
+cipher = encryptor;
+break;
+  case Cipher.DECRYPT_MODE:
+cipher = decryptor;
+break;
+  default:
+throw new IllegalArgumentException(String.valueOf(mode));
+}
 
-int scale = 1;
-while (true) {
-  int size = in.length * scale;
-  byte[] 

[spark] branch master updated: [SPARK-23619][DOCS] Add output description for some generator expressions / functions

2019-04-26 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 90085a1  [SPARK-23619][DOCS] Add output description for some generator 
expressions / functions
90085a1 is described below

commit 90085a184797f8bddbff8ca6ec7a60f3899c1a86
Author: Jash Gala 
AuthorDate: Sat Apr 27 10:30:12 2019 +0900

[SPARK-23619][DOCS] Add output description for some generator expressions / 
functions

## What changes were proposed in this pull request?

This PR addresses SPARK-23619: 
https://issues.apache.org/jira/browse/SPARK-23619

It adds additional comments indicating the default column names for the 
`explode` and `posexplode`
functions in Spark-SQL.

Functions for which comments have been updated so far:
* stack
* inline
* explode
* posexplode
* explode_outer
* posexplode_outer

## How was this patch tested?

This is just a change in the comments. The package builds and tests 
successfullly after the change.

Closes #23748 from jashgala/SPARK-23619.

Authored-by: Jash Gala 
Signed-off-by: HyukjinKwon 
---
 R/pkg/R/functions.R  | 12 ++--
 python/pyspark/sql/functions.py  | 20 
 .../spark/sql/catalyst/expressions/generators.scala  | 12 
 .../main/scala/org/apache/spark/sql/functions.scala  |  8 
 4 files changed, 42 insertions(+), 10 deletions(-)

diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 0566a47..3bd1f54 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -3589,6 +3589,8 @@ setMethod("element_at",
 
 #' @details
 #' \code{explode}: Creates a new row for each element in the given array or 
map column.
+#' Uses the default column name \code{col} for elements in the array and
+#' \code{key} and \code{value} for elements in the map unless specified 
otherwise.
 #'
 #' @rdname column_collection_functions
 #' @aliases explode explode,Column-method
@@ -3649,7 +3651,9 @@ setMethod("sort_array",
 
 #' @details
 #' \code{posexplode}: Creates a new row for each element with position in the 
given array
-#' or map column.
+#' or map column. Uses the default column name \code{pos} for position, and 
\code{col}
+#' for elements in the array and \code{key} and \code{value} for elements in 
the map
+#' unless specified otherwise.
 #'
 #' @rdname column_collection_functions
 #' @aliases posexplode posexplode,Column-method
@@ -3790,7 +3794,8 @@ setMethod("repeat_string",
 #' \code{explode}: Creates a new row for each element in the given array or 
map column.
 #' Unlike \code{explode}, if the array/map is \code{null} or empty
 #' then \code{null} is produced.
-#'
+#' Uses the default column name \code{col} for elements in the array and
+#' \code{key} and \code{value} for elements in the map unless specified 
otherwise.
 #'
 #' @rdname column_collection_functions
 #' @aliases explode_outer explode_outer,Column-method
@@ -3815,6 +3820,9 @@ setMethod("explode_outer",
 #' \code{posexplode_outer}: Creates a new row for each element with position 
in the given
 #' array or map column. Unlike \code{posexplode}, if the array/map is 
\code{null} or empty
 #' then the row (\code{null}, \code{null}) is produced.
+#' Uses the default column name \code{pos} for position, and \code{col}
+#' for elements in the array and \code{key} and \code{value} for elements in 
the map
+#' unless specified otherwise.
 #'
 #' @rdname column_collection_functions
 #' @aliases posexplode_outer posexplode_outer,Column-method
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 22163f5..613822b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2142,7 +2142,10 @@ def array_except(col1, col2):
 
 @since(1.4)
 def explode(col):
-"""Returns a new row for each element in the given array or map.
+"""
+Returns a new row for each element in the given array or map.
+Uses the default column name `col` for elements in the array and
+`key` and `value` for elements in the map unless specified otherwise.
 
 >>> from pyspark.sql import Row
 >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": 
"b"})])
@@ -2163,7 +2166,10 @@ def explode(col):
 
 @since(2.1)
 def posexplode(col):
-"""Returns a new row for each element with position in the given array or 
map.
+"""
+Returns a new row for each element with position in the given array or map.
+Uses the default column name `pos` for position, and `col` for elements in 
the
+array and `key` and `value` for elements in the map unless specified 
otherwise.
 
 >>> from pyspark.sql import Row
 >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": 
"b"})])
@@ -2184,8 

[spark] branch master updated: [MINOR][TEST][DOC] Execute action miss name message

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6328be7  [MINOR][TEST][DOC] Execute action miss name message
6328be7 is described below

commit 6328be78f96ef9c90697318bea83bd2033dae471
Author: uncleGen 
AuthorDate: Sat Apr 27 09:28:31 2019 +0800

[MINOR][TEST][DOC] Execute action miss name message

## What changes were proposed in this pull request?

some minor updates:
- `Execute` action miss `name` message
-  typo in SS document
-  typo in SQLConf

## How was this patch tested?

N/A

Closes #24466 from uncleGen/minor-fix.

Authored-by: uncleGen 
Signed-off-by: Wenchen Fan 
---
 docs/structured-streaming-programming-guide.md| 2 +-
 .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala| 4 ++--
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 2 +-
 .../apache/spark/sql/execution/datasources/DataSourceResolution.scala | 2 +-
 .../src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala| 2 +-
 6 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 2b6940d..2c4169d 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2995,7 +2995,7 @@ the effect of the change is not well-defined. For all of 
them:
 
   - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) are allowed, but the semantics of the change depends on the code.
 
-- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+- *Changes in projection / filter / map-like operations*: Some cases are 
allowed. For example:
 
   - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3a2e736..96d3f5c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2138,9 +2138,9 @@ class SQLConf extends Serializable with Logging {
   def continuousStreamingExecutorPollIntervalMs: Long =
 getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)
 
-  def userV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST)
+  def useV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST)
 
-  def userV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST)
+  def useV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST)
 
   def disabledV2StreamingWriters: String = 
getConf(DISABLED_V2_STREAMING_WRITERS)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 3cd48ba..8460c79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -195,7 +195,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 }
 
 val useV1Sources =
-  
sparkSession.sessionState.conf.userV1SourceReaderList.toLowerCase(Locale.ROOT).split(",")
+  
sparkSession.sessionState.conf.useV1SourceReaderList.toLowerCase(Locale.ROOT).split(",")
 val lookupCls = DataSource.lookupDataSource(source, 
sparkSession.sessionState.conf)
 val cls = lookupCls.newInstance() match {
   case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) ||
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3b84151..18653b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -248,7 +248,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
 
 val session = df.sparkSession
 val useV1Sources =
-  
session.sessionState.conf.userV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
+  
session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
 val lookupCls = DataSource.lookupDataSource(source, 
session.sessionState.conf)
 val cls = lookupCls.newInstance() match {
   case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) ||
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
 

[spark] branch branch-2.4 updated: [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite

2019-04-26 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ec53a19  [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in 
YarnSchedulerBackendSuite
ec53a19 is described below

commit ec53a19adbffba5a9f3e71cca4898f1bd7804abf
Author: “attilapiros” 
AuthorDate: Fri Apr 26 14:49:49 2019 -0700

[SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in 
YarnSchedulerBackendSuite

## What changes were proposed in this pull request?

The test "RequestExecutors reflects node blacklist and is serializable" is 
flaky because of multi threaded access of the mock task scheduler. For details 
check [Mockito FAQ (occasional exceptions like: 
WrongTypeOfReturnValue)](https://github.com/mockito/mockito/wiki/FAQ#is-mockito-thread-safe).
 So instead of mocking the task scheduler in the test TaskSchedulerImpl is 
simply subclassed.

This multithreaded access of the `nodeBlacklist()` method is coming from:
1) the unit test thread via calling of the method 
`prepareRequestExecutors()`
2) the `DriverEndpoint.onStart` which runs a periodic task that ends up 
calling this method

## How was this patch tested?

Existing unittest.

(cherry picked from commit e4e4e2b842bffba6805623f2258b27b162b451ba)

Closes #24474 from attilapiros/SPARK-26891-branch-2.4.

Authored-by: “attilapiros” 
Signed-off-by: Dongjoon Hyun 
---
 .../cluster/YarnSchedulerBackendSuite.scala| 35 +-
 1 file changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
index 7fac57f..bd2cf97 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.scheduler.cluster
 
+import java.util.concurrent.atomic.AtomicReference
+
 import scala.language.reflectiveCalls
 
 import org.mockito.Mockito.when
@@ -27,15 +29,35 @@ import org.apache.spark.serializer.JavaSerializer
 
 class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with 
LocalSparkContext {
 
+  private var yarnSchedulerBackend: YarnSchedulerBackend = _
+
+  override def afterEach(): Unit = {
+try {
+  if (yarnSchedulerBackend != null) {
+yarnSchedulerBackend.stop()
+  }
+} finally {
+  super.afterEach()
+}
+  }
+
   test("RequestExecutors reflects node blacklist and is serializable") {
 sc = new SparkContext("local", "YarnSchedulerBackendSuite")
-val sched = mock[TaskSchedulerImpl]
-when(sched.sc).thenReturn(sc)
-val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) {
+// Subclassing the TaskSchedulerImpl here instead of using Mockito. For 
details see SPARK-26891.
+val sched = new TaskSchedulerImpl(sc) {
+  val blacklistedNodes = new AtomicReference[Set[String]]()
+
+  def setNodeBlacklist(nodeBlacklist: Set[String]): Unit = 
blacklistedNodes.set(nodeBlacklist)
+
+  override def nodeBlacklist(): Set[String] = blacklistedNodes.get()
+}
+
+val yarnSchedulerBackendExtended = new YarnSchedulerBackend(sched, sc) {
   def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): 
Unit = {
 this.hostToLocalTaskCount = hostToLocalTaskCount
   }
 }
+yarnSchedulerBackend = yarnSchedulerBackendExtended
 val ser = new JavaSerializer(sc.conf).newInstance()
 for {
   blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
@@ -45,16 +67,15 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with 
MockitoSugar with Loc
 Map("a" -> 1, "b" -> 2)
   )
 } {
-  yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount)
-  when(sched.nodeBlacklist()).thenReturn(blacklist)
-  val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested)
+  yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount)
+  sched.setNodeBlacklist(blacklist)
+  val req = 
yarnSchedulerBackendExtended.prepareRequestExecutors(numRequested)
   assert(req.requestedTotal === numRequested)
   assert(req.nodeBlacklist === blacklist)
   assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
   // Serialize to make sure serialization doesn't throw an error
   ser.serialize(req)
 }
-sc.stop()
   }
 
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[spark] branch master updated: [SPARK-27477][BUILD] Kafka token provider should have provided dependency on Spark

2019-04-26 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7b367bf  [SPARK-27477][BUILD] Kafka token provider should have 
provided dependency on Spark
7b367bf is described below

commit 7b367bfc86dbe7f61c0dda4f4811508137bbd0cc
Author: Koert Kuipers 
AuthorDate: Fri Apr 26 11:52:08 2019 -0700

[SPARK-27477][BUILD] Kafka token provider should have provided dependency 
on Spark

## What changes were proposed in this pull request?

Change spark-token-provider-kafka-0-10 dependency on spark-core to be 
provided

## How was this patch tested?

Ran existing unit tests

Closes #24384 from koertkuipers/feat-kafka-token-provider-fix-deps.

Authored-by: Koert Kuipers 
Signed-off-by: Marcelo Vanzin 
---
 external/kafka-0-10-token-provider/pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/external/kafka-0-10-token-provider/pom.xml 
b/external/kafka-0-10-token-provider/pom.xml
index 40ef1f7..01ca96b 100644
--- a/external/kafka-0-10-token-provider/pom.xml
+++ b/external/kafka-0-10-token-provider/pom.xml
@@ -39,6 +39,7 @@
   org.apache.spark
   spark-core_${scala.binary.version}
   ${project.version}
+  provided
 
 
   org.apache.spark


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27556][BUILD] Exclude com.zaxxer:HikariCP-java7 from hadoop-yarn-server-web-proxy

2019-04-26 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fe99305  [SPARK-27556][BUILD] Exclude com.zaxxer:HikariCP-java7 from 
hadoop-yarn-server-web-proxy
fe99305 is described below

commit fe99305101fa01713021bc0ffae943066c07a0d0
Author: Yuming Wang 
AuthorDate: Fri Apr 26 12:15:39 2019 -0500

[SPARK-27556][BUILD] Exclude com.zaxxer:HikariCP-java7 from 
hadoop-yarn-server-web-proxy

## What changes were proposed in this pull request?

There are two HikariCP packages in classpath when building with `-Phive 
-Pyarn -Phadoop-3.2`.

The HikariCP dependency tree:
```
[INFO] | +- org.apache.hadoop:hadoop-yarn-server-web-proxy:jar:3.2.0:compile
[INFO] | | \- org.apache.hadoop:hadoop-yarn-server-common:jar:3.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-yarn-registry:jar:3.2.0:compile
[INFO] | | | \- commons-daemon:commons-daemon:jar:1.0.13:compile
[INFO] | | +- 
org.apache.geronimo.specs:geronimo-jcache_1.0_spec:jar:1.0-alpha-1:compile
[INFO] | | +- org.ehcache:ehcache:jar:3.3.1:compile
[INFO] | | +- com.zaxxer:HikariCP-java7:jar:2.4.12:compile
```

```
[INFO] +- org.apache.hive:hive-metastore:jar:2.3.4:compile
[INFO] | +- javolution:javolution:jar:5.5.1:compile
[INFO] | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] | +- com.jolbox:bonecp:jar:0.8.0.RELEASE:compile
[INFO] | +- com.zaxxer:HikariCP:jar:2.5.1:compile
```

This pr exclude `com.zaxxer:HikariCP-java7` from 
`hadoop-yarn-server-web-proxy`.

## How was this patch tested?

manual tests

Closes #24450 from wangyum/SPARK-27556.

Authored-by: Yuming Wang 
Signed-off-by: Sean Owen 
---
 dev/deps/spark-deps-hadoop-3.2 | 1 -
 pom.xml| 5 +
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2
index 8b3bd79..5874151 100644
--- a/dev/deps/spark-deps-hadoop-3.2
+++ b/dev/deps/spark-deps-hadoop-3.2
@@ -1,4 +1,3 @@
-HikariCP-java7-2.4.12.jar
 JavaEWAH-0.3.2.jar
 RoaringBitmap-0.7.45.jar
 ST4-4.0.4.jar
diff --git a/pom.xml b/pom.xml
index 5153957..91661cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1210,6 +1210,11 @@
 com.sun.jersey.contribs
 *
   
+  
+  
+com.zaxxer
+HikariCP-java7
+  
 
   
   


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] srowen closed pull request #199: Add note about Scala 2.12 default to 2.4.2 release notes

2019-04-26 Thread GitBox
srowen closed pull request #199: Add note about Scala 2.12 default to 2.4.2 
release notes
URL: https://github.com/apache/spark-website/pull/199
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark-website] branch asf-site updated: Add note about Scala 2.12 default to 2.4.2 release notes

2019-04-26 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/spark-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new f074f7d  Add note about Scala 2.12 default to 2.4.2 release notes
f074f7d is described below

commit f074f7de8bc8f25826430f3294b8fc9892234f26
Author: Sean Owen 
AuthorDate: Fri Apr 26 12:14:01 2019 -0500

Add note about Scala 2.12 default to 2.4.2 release notes

Author: Sean Owen 

Closes #199 from srowen/242ReleaseNotes.
---
 releases/_posts/2019-04-23-spark-release-2-4-2.md | 8 ++--
 site/releases/spark-release-2-4-2.html| 8 ++--
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/releases/_posts/2019-04-23-spark-release-2-4-2.md 
b/releases/_posts/2019-04-23-spark-release-2-4-2.md
index cccd823..6f74aba 100644
--- a/releases/_posts/2019-04-23-spark-release-2-4-2.md
+++ b/releases/_posts/2019-04-23-spark-release-2-4-2.md
@@ -13,9 +13,13 @@ meta:
 
 Spark 2.4.2 is a maintenance release containing stability fixes. This release 
is based on the branch-2.4 maintenance branch of Spark. We strongly recommend 
all 2.4 users to upgrade to this stable release.
 
+Note that Scala 2.11 support is deprecated from 2.4.1 onwards.
+As of 2.4.2, the pre-built convenience binaries are compiled for Scala 2.12.
+Spark is still cross-published for 2.11 and 2.12 in Maven Central, and can be 
built for 2.11 from source.
+
 ### Notable changes
-  - SPARK-27419: When setting spark.executor.heartbeatInterval to a value less 
than 1 seconds in branch-2.4, it will always fail because the value will be 
converted to 0 and the heartbeat will always timeout and finally kill the 
executor.
-  - Revert SPARK-25250: It may cause the job to hang forever, and is reverted 
in 2.4.2.
+  - [[SPARK-27419]](https://issues.apache.org/jira/browse/SPARK-27419): When 
setting `spark.executor.heartbeatInterval` to a value less than 1 seconds, it 
will always fail because the value will be converted to 0 and the heartbeat 
will always timeout and finally kill the executor.
+  - Revert [[SPARK-25250]](https://issues.apache.org/jira/browse/SPARK-25250): 
It may cause the job to hang forever, and is reverted in 2.4.2.
 
 You can consult JIRA for the [detailed 
changes](https://s.apache.org/spark-2.4.2).
 
diff --git a/site/releases/spark-release-2-4-2.html 
b/site/releases/spark-release-2-4-2.html
index 56adac3..b6d6cf1 100644
--- a/site/releases/spark-release-2-4-2.html
+++ b/site/releases/spark-release-2-4-2.html
@@ -205,10 +205,14 @@
 
 Spark 2.4.2 is a maintenance release containing stability fixes. This 
release is based on the branch-2.4 maintenance branch of Spark. We strongly 
recommend all 2.4 users to upgrade to this stable release.
 
+Note that Scala 2.11 support is deprecated from 2.4.1 onwards.
+As of 2.4.2, the pre-built convenience binaries are compiled for Scala 2.12.
+Spark is still cross-published for 2.11 and 2.12 in Maven Central, and can be 
built for 2.11 from source.
+
 Notable changes
 
-  SPARK-27419: When setting spark.executor.heartbeatInterval to a value 
less than 1 seconds in branch-2.4, it will always fail because the value will 
be converted to 0 and the heartbeat will always timeout and finally kill the 
executor.
-  Revert SPARK-25250: It may cause the job to hang forever, and is 
reverted in 2.4.2.
+  https://issues.apache.org/jira/browse/SPARK-27419;>[SPARK-27419]: 
When setting spark.executor.heartbeatInterval to a value less than 
1 seconds, it will always fail because the value will be converted to 0 and the 
heartbeat will always timeout and finally kill the executor.
+  Revert https://issues.apache.org/jira/browse/SPARK-25250;>[SPARK-25250]: It 
may cause the job to hang forever, and is reverted in 2.4.2.
 
 
 You can consult JIRA for the https://s.apache.org/spark-2.4.2;>detailed changes.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] srowen commented on issue #199: Add note about Scala 2.12 default to 2.4.2 release notes

2019-04-26 Thread GitBox
srowen commented on issue #199: Add note about Scala 2.12 default to 2.4.2 
release notes
URL: https://github.com/apache/spark-website/pull/199#issuecomment-487080217
 
 
   Agree, I figure it's best to call extra attention to it for now, either way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch0-2.3 deleted (was a956e9c)

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch branch0-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git.


 was a956e9c  [SPARK-27563][SQL][TEST] automatically get the latest Spark 
versions in HiveExternalCatalogVersionsSuite

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch0-2.3 created (now a956e9c)

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch branch0-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git.


  at a956e9c  [SPARK-27563][SQL][TEST] automatically get the latest Spark 
versions in HiveExternalCatalogVersionsSuite

This branch includes the following new commits:

 new a956e9c  [SPARK-27563][SQL][TEST] automatically get the latest Spark 
versions in HiveExternalCatalogVersionsSuite

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 01/01: [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch0-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git

commit a956e9c765026de0009da4a5867bb768375c22ed
Author: Wenchen Fan 
AuthorDate: Fri Apr 26 16:37:43 2019 +0900

[SPARK-27563][SQL][TEST] automatically get the latest Spark versions in 
HiveExternalCatalogVersionsSuite

We can get the latest downloadable Spark versions from 
https://dist.apache.org/repos/dist/release/spark/

manually.

Closes #24454 from cloud-fan/test.

Authored-by: Wenchen Fan 
Signed-off-by: HyukjinKwon 
---
 .../sql/hive/HiveExternalCatalogVersionsSuite.scala   | 19 ++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 680abb6..916f73f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 
 import scala.sys.process._
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 
@@ -166,6 +167,10 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
   """.stripMargin.getBytes("utf8"))
 // scalastyle:on line.size.limit
 
+if (PROCESS_TABLES.testingVersions.isEmpty) {
+  fail("Fail to get the lates Spark versions to test.")
+}
+
 PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, 
index) =>
   val sparkHome = new File(sparkTestingDir, s"spark-$version")
   if (!sparkHome.exists()) {
@@ -203,7 +208,19 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
 
 object PROCESS_TABLES extends QueryTest with SQLTestUtils {
   // Tests the latest version of every release line.
-  val testingVersions = Seq("2.3.3")
+  val testingVersions: Seq[String] = {
+import scala.io.Source
+try {
+  
Source.fromURL("https://dist.apache.org/repos/dist/release/spark/;).mkString
+.split("\n")
+.filter(_.contains("".r.findFirstMatchIn(_).get.group(1))
+.filter(_ < org.apache.spark.SPARK_VERSION)
+} catch {
+  // do not throw exception during object initialization.
+  case NonFatal(_) => Nil
+}
+  }
 
   protected var spark: SparkSession = _
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.3 updated (257abc4 -> a956e9c)

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 257abc4  [SPARK-27496][CORE] Fatal errors should also be sent back to 
the sender
 add a956e9c  [SPARK-27563][SQL][TEST] automatically get the latest Spark 
versions in HiveExternalCatalogVersionsSuite

No new revisions were added by this update.

Summary of changes:
 .../sql/hive/HiveExternalCatalogVersionsSuite.scala   | 19 ++-
 1 file changed, 18 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 29a4e04  [SPARK-27563][SQL][TEST] automatically get the latest Spark 
versions in HiveExternalCatalogVersionsSuite
29a4e04 is described below

commit 29a4e048feb459e5121b6d21c741a81f48991f64
Author: Wenchen Fan 
AuthorDate: Fri Apr 26 16:37:43 2019 +0900

[SPARK-27563][SQL][TEST] automatically get the latest Spark versions in 
HiveExternalCatalogVersionsSuite

## What changes were proposed in this pull request?

We can get the latest downloadable Spark versions from 
https://dist.apache.org/repos/dist/release/spark/

## How was this patch tested?

manually.

Closes #24454 from cloud-fan/test.

Authored-by: Wenchen Fan 
Signed-off-by: HyukjinKwon 
---
 .../sql/hive/HiveExternalCatalogVersionsSuite.scala   | 19 ++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index a4d6a69..8828471 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 
 import scala.sys.process._
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 
@@ -166,6 +167,10 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
   """.stripMargin.getBytes("utf8"))
 // scalastyle:on line.size.limit
 
+if (PROCESS_TABLES.testingVersions.isEmpty) {
+  fail("Fail to get the lates Spark versions to test.")
+}
+
 PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, 
index) =>
   val sparkHome = new File(sparkTestingDir, s"spark-$version")
   if (!sparkHome.exists()) {
@@ -203,7 +208,19 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
 
 object PROCESS_TABLES extends QueryTest with SQLTestUtils {
   // Tests the latest version of every release line.
-  val testingVersions = Seq("2.3.3", "2.4.2")
+  val testingVersions: Seq[String] = {
+import scala.io.Source
+try {
+  
Source.fromURL("https://dist.apache.org/repos/dist/release/spark/;).mkString
+.split("\n")
+.filter(_.contains("".r.findFirstMatchIn(_).get.group(1))
+.filter(_ < org.apache.spark.SPARK_VERSION)
+} catch {
+  // do not throw exception during object initialization.
+  case NonFatal(_) => Nil
+}
+  }
 
   protected var spark: SparkSession = _
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27190][SQL] add table capability for streaming

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 85fd552  [SPARK-27190][SQL] add table capability for streaming
85fd552 is described below

commit 85fd552ed6304967f25574baef3cf9657957bcb1
Author: Wenchen Fan 
AuthorDate: Fri Apr 26 15:44:23 2019 +0800

[SPARK-27190][SQL] add table capability for streaming

## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/24012 , to add 
the corresponding capabilities for streaming.

## How was this patch tested?

existing tests

Closes #24129 from cloud-fan/capability.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  11 +-
 .../sql/sources/v2/SupportsContinuousRead.java |  35 --
 .../sql/sources/v2/SupportsMicroBatchRead.java |  35 --
 .../sql/sources/v2/SupportsStreamingWrite.java |  34 --
 .../spark/sql/sources/v2/TableCapability.java  |  19 +++
 .../apache/spark/sql/sources/v2/reader/Scan.java   |  10 +-
 .../datasources/noop/NoopDataSource.scala  |   7 +-
 .../v2/V2StreamingScanSupportCheck.scala   |  64 ++
 .../execution/streaming/MicroBatchExecution.scala  |  61 +-
 .../sql/execution/streaming/StreamExecution.scala  |   4 +-
 .../spark/sql/execution/streaming/console.scala|   9 +-
 .../streaming/continuous/ContinuousExecution.scala |  22 ++--
 .../spark/sql/execution/streaming/memory.scala |   9 +-
 .../streaming/sources/ForeachWriterTable.scala |  12 +-
 .../streaming/sources/RateStreamProvider.scala |   9 +-
 .../sources/TextSocketSourceProvider.scala |   9 +-
 .../sql/execution/streaming/sources/memoryV2.scala |  10 +-
 .../sql/internal/BaseSessionStateBuilder.scala |   3 +-
 .../spark/sql/streaming/DataStreamReader.scala |   5 +-
 .../spark/sql/streaming/DataStreamWriter.scala |   7 +-
 .../sql/streaming/StreamingQueryManager.scala  |   6 +-
 .../v2/V2StreamingScanSupportCheckSuite.scala  | 130 +
 .../sources/StreamingDataSourceV2Suite.scala   | 106 ++---
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   3 +-
 24 files changed, 389 insertions(+), 231 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index f7a2032..bb76a30 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.{Collections, Locale, UUID}
+import java.util.{Locale, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -29,9 +29,10 @@ import 
org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.spark.internal.Logging
 import org.apache.spark.kafka010.KafkaConfigUpdater
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
-import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, 
Source}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, 
MicroBatchStream}
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder
@@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   }
 
   class KafkaTable(strategy: => ConsumerStrategy) extends Table
-with SupportsMicroBatchRead with SupportsContinuousRead with 
SupportsStreamingWrite {
+with SupportsRead with SupportsWrite with BaseStreamingSink {
 
 override def name(): String = s"Kafka $strategy"
 
 override def schema(): StructType = KafkaOffsetReader.kafkaSchema
 
-override def capabilities(): ju.Set[TableCapability] = 
Collections.emptySet()
+override def capabilities(): ju.Set[TableCapability] = {
+  Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava
+}
 
 override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder =
   () => new KafkaScan(options)
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
deleted file mode 100644
index 5cc9848..000
--- 

[spark] branch master updated: [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite

2019-04-26 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2234667  [SPARK-27563][SQL][TEST] automatically get the latest Spark 
versions in HiveExternalCatalogVersionsSuite
2234667 is described below

commit 2234667b159bf19a68758da3ff20cfae3c058c25
Author: Wenchen Fan 
AuthorDate: Fri Apr 26 16:37:43 2019 +0900

[SPARK-27563][SQL][TEST] automatically get the latest Spark versions in 
HiveExternalCatalogVersionsSuite

## What changes were proposed in this pull request?

We can get the latest downloadable Spark versions from 
https://dist.apache.org/repos/dist/release/spark/

## How was this patch tested?

manually.

Closes #24454 from cloud-fan/test.

Authored-by: Wenchen Fan 
Signed-off-by: HyukjinKwon 
---
 .../sql/hive/HiveExternalCatalogVersionsSuite.scala   | 19 ++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 0a05ec5..ec10295 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 
 import scala.sys.process._
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 
@@ -169,6 +170,10 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
   """.stripMargin.getBytes("utf8"))
 // scalastyle:on line.size.limit
 
+if (PROCESS_TABLES.testingVersions.isEmpty) {
+  fail("Fail to get the lates Spark versions to test.")
+}
+
 PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, 
index) =>
   val sparkHome = new File(sparkTestingDir, s"spark-$version")
   if (!sparkHome.exists()) {
@@ -206,7 +211,19 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
 
 object PROCESS_TABLES extends QueryTest with SQLTestUtils {
   // Tests the latest version of every release line.
-  val testingVersions = Seq("2.3.3", "2.4.2")
+  val testingVersions: Seq[String] = {
+import scala.io.Source
+try {
+  
Source.fromURL("https://dist.apache.org/repos/dist/release/spark/;).mkString
+.split("\n")
+.filter(_.contains("".r.findFirstMatchIn(_).get.group(1))
+.filter(_ < org.apache.spark.SPARK_VERSION)
+} catch {
+  // do not throw exception during object initialization.
+  case NonFatal(_) => Nil
+}
+  }
 
   protected var spark: SparkSession = _
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: add missing import and fix compilation

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ed0739a  add missing import and fix compilation
ed0739a is described below

commit ed0739a90128b697f598a58b7300dcfd5492fdba
Author: Wenchen Fan 
AuthorDate: Fri Apr 26 15:33:20 2019 +0800

add missing import and fix compilation
---
 .../scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index da92019..36da2f1 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.StructType


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-27494][SS] Null values don't work in Kafka source v2

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 705507f  [SPARK-27494][SS] Null values don't work in Kafka source v2
705507f is described below

commit 705507facda11060f1a0beb04d1dd19bda5fc4f3
Author: uncleGen 
AuthorDate: Fri Apr 26 14:25:31 2019 +0800

[SPARK-27494][SS] Null values don't work in Kafka source v2

## What changes were proposed in this pull request?

Right now Kafka source v2 doesn't support null values. The issue is in 
org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which 
doesn't handle null values.

## How was this patch tested?

add new unit tests

Closes #24441 from uncleGen/SPARK-27494.

Authored-by: uncleGen 
Signed-off-by: Wenchen Fan 
(cherry picked from commit d2656aaecd4a7b5562d8d2065aaa66fdc72d253d)
Signed-off-by: Wenchen Fan 
---
 .../kafka010/KafkaRecordToUnsafeRowConverter.scala |  7 ++-
 .../sql/kafka010/KafkaContinuousSourceSuite.scala  |  4 ++
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 58 ++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
index f35a143..306ef10 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
 
   def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow 
= {
 rowWriter.reset()
+rowWriter.zeroOutNullBytes()
 
 if (record.key == null) {
   rowWriter.setNullAt(0)
 } else {
   rowWriter.write(0, record.key)
 }
-rowWriter.write(1, record.value)
+if (record.value == null) {
+  rowWriter.setNullAt(1)
+} else {
+  rowWriter.write(1, record.value)
+}
 rowWriter.write(2, UTF8String.fromString(record.topic))
 rowWriter.write(3, record.partition)
 rowWriter.write(4, record.offset)
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index a0e5818..649cb72 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends 
KafkaSourceSuiteBase with KafkaContinuo
   }
 }
   }
+
+  test("SPARK-27494: read kafka record containing null key/values.") {
+testNullableKeyValue(ContinuousTrigger(100))
+  }
 }
 
 class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 34cf335..da92019 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -988,6 +988,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   q.stop()
 }
   }
+
+  test("SPARK-27494: read kafka record containing null key/values.") {
+testNullableKeyValue(Trigger.ProcessingTime(100))
+  }
 }
 
 
@@ -1461,6 +1465,60 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
   CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
 )
   }
+
+  protected def testNullableKeyValue(trigger: Trigger): Unit = {
+val table = "kafka_null_key_value_source_test"
+withTable(table) {
+  val topic = newTopic()
+  testUtils.createTopic(topic)
+  testUtils.withTranscationalProducer { producer =>
+val df = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.isolation.level", "read_committed")
+  .option("startingOffsets", "earliest")
+  .option("subscribe", topic)
+  .load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+
+val q = df
+  .writeStream
+  .format("memory")
+  .queryName(table)
+  .trigger(trigger)
+ 

[spark] branch master updated: [SPARK-27494][SS] Null values don't work in Kafka source v2

2019-04-26 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d2656aa  [SPARK-27494][SS] Null values don't work in Kafka source v2
d2656aa is described below

commit d2656aaecd4a7b5562d8d2065aaa66fdc72d253d
Author: uncleGen 
AuthorDate: Fri Apr 26 14:25:31 2019 +0800

[SPARK-27494][SS] Null values don't work in Kafka source v2

## What changes were proposed in this pull request?

Right now Kafka source v2 doesn't support null values. The issue is in 
org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which 
doesn't handle null values.

## How was this patch tested?

add new unit tests

Closes #24441 from uncleGen/SPARK-27494.

Authored-by: uncleGen 
Signed-off-by: Wenchen Fan 
---
 .../kafka010/KafkaRecordToUnsafeRowConverter.scala |  7 ++-
 .../sql/kafka010/KafkaContinuousSourceSuite.scala  |  4 ++
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 58 ++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
index f35a143..306ef10 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
 
   def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow 
= {
 rowWriter.reset()
+rowWriter.zeroOutNullBytes()
 
 if (record.key == null) {
   rowWriter.setNullAt(0)
 } else {
   rowWriter.write(0, record.key)
 }
-rowWriter.write(1, record.value)
+if (record.value == null) {
+  rowWriter.setNullAt(1)
+} else {
+  rowWriter.write(1, record.value)
+}
 rowWriter.write(2, UTF8String.fromString(record.topic))
 rowWriter.write(3, record.partition)
 rowWriter.write(4, record.offset)
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index be0cea2..9b3e78c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends 
KafkaSourceSuiteBase with KafkaContinuo
   }
 }
   }
+
+  test("SPARK-27494: read kafka record containing null key/values.") {
+testNullableKeyValue(ContinuousTrigger(100))
+  }
 }
 
 class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 21634ae..b98f8e9 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   q.stop()
 }
   }
+
+  test("SPARK-27494: read kafka record containing null key/values.") {
+testNullableKeyValue(Trigger.ProcessingTime(100))
+  }
 }
 
 
@@ -1511,6 +1515,60 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
   CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
 )
   }
+
+  protected def testNullableKeyValue(trigger: Trigger): Unit = {
+val table = "kafka_null_key_value_source_test"
+withTable(table) {
+  val topic = newTopic()
+  testUtils.createTopic(topic)
+  testUtils.withTranscationalProducer { producer =>
+val df = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.isolation.level", "read_committed")
+  .option("startingOffsets", "earliest")
+  .option("subscribe", topic)
+  .load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+
+val q = df
+  .writeStream
+  .format("memory")
+  .queryName(table)
+  .trigger(trigger)
+  .start()
+try {
+  var idx = 0
+  producer.beginTransaction()
+  val