[spark] branch branch-2.4 updated: [SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record count
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ba9e12d [SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record count ba9e12d is described below commit ba9e12d55d043d6331df59a3829d40e41a9e2171 Author: Branden Smith AuthorDate: Wed Feb 6 13:55:19 2019 +0800 [SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record count This PR consists of the `test` components of #23665 only, minus the associated patch from that PR. It adds a new unit test to `JsonSuite` which verifies that the `count()` returned from a `DataFrame` loaded from JSON containing empty lines does not include those empty lines in the record count. The test runs `count` prior to otherwise reading data from the `DataFrame`, so as to catch future cases where a pre-parsing optimization might result in `count` results inconsistent with existing behavior. This PR is intended to be deployed alongside #23667; `master` currently causes the test to fail, as described in [SPARK-26745](https://issues.apache.org/jira/browse/SPARK-26745). Manual testing, existing `JsonSuite` unit tests. Closes #23674 from sumitsu/json_emptyline_count_test. Authored-by: Branden Smith Signed-off-by: Hyukjin Kwon (cherry picked from commit 63bced9375ec1ec6ded220d768cd746050861a09) Signed-off-by: Dongjoon Hyun --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 12 1 file changed, 12 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3e4cc8f..5ca430a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2515,4 +2515,16 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkCount(2) countForMalformedJSON(0, Seq("")) } + + test("SPARK-26745: count() for non-multiline input with empty lines") { +withTempPath { tempPath => + val path = tempPath.getCanonicalPath + Seq("""{ "a" : 1 }""", "", """ { "a" : 2 }""", " \t ") +.toDS() +.repartition(1) +.write +.text(path) + assert(spark.read.json(path).count() === 2) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-25535][CORE][BRANCH-2.4] Work around bad error handling in commons-crypto.
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new fce9b2b [SPARK-25535][CORE][BRANCH-2.4] Work around bad error handling in commons-crypto. fce9b2b is described below commit fce9b2bce647f7554cacd1245cf670ff938f84f7 Author: Marcelo Vanzin AuthorDate: Fri Apr 26 21:23:17 2019 -0700 [SPARK-25535][CORE][BRANCH-2.4] Work around bad error handling in commons-crypto. The commons-crypto library does some questionable error handling internally, which can lead to JVM crashes if some call into native code fails and cleans up state it should not. While the library is not fixed, this change adds some workarounds in Spark code so that when an error is detected in the commons-crypto side, Spark avoids calling into the library further. Tested with existing and added unit tests. Closes #24476 from vanzin/SPARK-25535-2.4. Authored-by: Marcelo Vanzin Signed-off-by: Dongjoon Hyun --- .../apache/spark/network/crypto/AuthEngine.java| 95 +- .../spark/network/crypto/TransportCipher.java | 60 +++-- .../spark/network/crypto/AuthEngineSuite.java | 17 +++ .../apache/spark/security/CryptoStreamUtils.scala | 137 +++-- .../spark/security/CryptoStreamUtilsSuite.scala| 37 +- 5 files changed, 295 insertions(+), 51 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java index 056505e..64fdb32 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java @@ -159,15 +159,21 @@ class AuthEngine implements Closeable { // accurately report the errors when they happen. RuntimeException error = null; byte[] dummy = new byte[8]; -try { - doCipherOp(encryptor, dummy, true); -} catch (Exception e) { - error = new RuntimeException(e); +if (encryptor != null) { + try { +doCipherOp(Cipher.ENCRYPT_MODE, dummy, true); + } catch (Exception e) { +error = new RuntimeException(e); + } + encryptor = null; } -try { - doCipherOp(decryptor, dummy, true); -} catch (Exception e) { - error = new RuntimeException(e); +if (decryptor != null) { + try { +doCipherOp(Cipher.DECRYPT_MODE, dummy, true); + } catch (Exception e) { +error = new RuntimeException(e); + } + decryptor = null; } random.close(); @@ -189,11 +195,11 @@ class AuthEngine implements Closeable { } private byte[] decrypt(byte[] in) throws GeneralSecurityException { -return doCipherOp(decryptor, in, false); +return doCipherOp(Cipher.DECRYPT_MODE, in, false); } private byte[] encrypt(byte[] in) throws GeneralSecurityException { -return doCipherOp(encryptor, in, false); +return doCipherOp(Cipher.ENCRYPT_MODE, in, false); } private void initializeForAuth(String cipher, byte[] nonce, SecretKeySpec key) @@ -205,11 +211,13 @@ class AuthEngine implements Closeable { byte[] iv = new byte[conf.ivLength()]; System.arraycopy(nonce, 0, iv, 0, Math.min(nonce.length, iv.length)); -encryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf); -encryptor.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(iv)); +CryptoCipher _encryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf); +_encryptor.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(iv)); +this.encryptor = _encryptor; -decryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf); -decryptor.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv)); +CryptoCipher _decryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf); +_decryptor.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv)); +this.decryptor = _decryptor; } /** @@ -241,29 +249,52 @@ class AuthEngine implements Closeable { return new SecretKeySpec(key.getEncoded(), conf.keyAlgorithm()); } - private byte[] doCipherOp(CryptoCipher cipher, byte[] in, boolean isFinal) + private byte[] doCipherOp(int mode, byte[] in, boolean isFinal) throws GeneralSecurityException { -Preconditions.checkState(cipher != null); +CryptoCipher cipher; +switch (mode) { + case Cipher.ENCRYPT_MODE: +cipher = encryptor; +break; + case Cipher.DECRYPT_MODE: +cipher = decryptor; +break; + default: +throw new IllegalArgumentException(String.valueOf(mode)); +} -int scale = 1; -while (true) { - int size = in.length * scale; - byte[]
[spark] branch master updated: [SPARK-23619][DOCS] Add output description for some generator expressions / functions
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 90085a1 [SPARK-23619][DOCS] Add output description for some generator expressions / functions 90085a1 is described below commit 90085a184797f8bddbff8ca6ec7a60f3899c1a86 Author: Jash Gala AuthorDate: Sat Apr 27 10:30:12 2019 +0900 [SPARK-23619][DOCS] Add output description for some generator expressions / functions ## What changes were proposed in this pull request? This PR addresses SPARK-23619: https://issues.apache.org/jira/browse/SPARK-23619 It adds additional comments indicating the default column names for the `explode` and `posexplode` functions in Spark-SQL. Functions for which comments have been updated so far: * stack * inline * explode * posexplode * explode_outer * posexplode_outer ## How was this patch tested? This is just a change in the comments. The package builds and tests successfullly after the change. Closes #23748 from jashgala/SPARK-23619. Authored-by: Jash Gala Signed-off-by: HyukjinKwon --- R/pkg/R/functions.R | 12 ++-- python/pyspark/sql/functions.py | 20 .../spark/sql/catalyst/expressions/generators.scala | 12 .../main/scala/org/apache/spark/sql/functions.scala | 8 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 0566a47..3bd1f54 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3589,6 +3589,8 @@ setMethod("element_at", #' @details #' \code{explode}: Creates a new row for each element in the given array or map column. +#' Uses the default column name \code{col} for elements in the array and +#' \code{key} and \code{value} for elements in the map unless specified otherwise. #' #' @rdname column_collection_functions #' @aliases explode explode,Column-method @@ -3649,7 +3651,9 @@ setMethod("sort_array", #' @details #' \code{posexplode}: Creates a new row for each element with position in the given array -#' or map column. +#' or map column. Uses the default column name \code{pos} for position, and \code{col} +#' for elements in the array and \code{key} and \code{value} for elements in the map +#' unless specified otherwise. #' #' @rdname column_collection_functions #' @aliases posexplode posexplode,Column-method @@ -3790,7 +3794,8 @@ setMethod("repeat_string", #' \code{explode}: Creates a new row for each element in the given array or map column. #' Unlike \code{explode}, if the array/map is \code{null} or empty #' then \code{null} is produced. -#' +#' Uses the default column name \code{col} for elements in the array and +#' \code{key} and \code{value} for elements in the map unless specified otherwise. #' #' @rdname column_collection_functions #' @aliases explode_outer explode_outer,Column-method @@ -3815,6 +3820,9 @@ setMethod("explode_outer", #' \code{posexplode_outer}: Creates a new row for each element with position in the given #' array or map column. Unlike \code{posexplode}, if the array/map is \code{null} or empty #' then the row (\code{null}, \code{null}) is produced. +#' Uses the default column name \code{pos} for position, and \code{col} +#' for elements in the array and \code{key} and \code{value} for elements in the map +#' unless specified otherwise. #' #' @rdname column_collection_functions #' @aliases posexplode_outer posexplode_outer,Column-method diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 22163f5..613822b 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2142,7 +2142,10 @@ def array_except(col1, col2): @since(1.4) def explode(col): -"""Returns a new row for each element in the given array or map. +""" +Returns a new row for each element in the given array or map. +Uses the default column name `col` for elements in the array and +`key` and `value` for elements in the map unless specified otherwise. >>> from pyspark.sql import Row >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) @@ -2163,7 +2166,10 @@ def explode(col): @since(2.1) def posexplode(col): -"""Returns a new row for each element with position in the given array or map. +""" +Returns a new row for each element with position in the given array or map. +Uses the default column name `pos` for position, and `col` for elements in the +array and `key` and `value` for elements in the map unless specified otherwise. >>> from pyspark.sql import Row >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) @@ -2184,8
[spark] branch master updated: [MINOR][TEST][DOC] Execute action miss name message
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6328be7 [MINOR][TEST][DOC] Execute action miss name message 6328be7 is described below commit 6328be78f96ef9c90697318bea83bd2033dae471 Author: uncleGen AuthorDate: Sat Apr 27 09:28:31 2019 +0800 [MINOR][TEST][DOC] Execute action miss name message ## What changes were proposed in this pull request? some minor updates: - `Execute` action miss `name` message - typo in SS document - typo in SQLConf ## How was this patch tested? N/A Closes #24466 from uncleGen/minor-fix. Authored-by: uncleGen Signed-off-by: Wenchen Fan --- docs/structured-streaming-programming-guide.md| 2 +- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala| 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala| 2 +- sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala| 2 +- .../apache/spark/sql/execution/datasources/DataSourceResolution.scala | 2 +- .../src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala| 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 2b6940d..2c4169d 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2995,7 +2995,7 @@ the effect of the change is not well-defined. For all of them: - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) are allowed, but the semantics of the change depends on the code. -- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example: +- *Changes in projection / filter / map-like operations*: Some cases are allowed. For example: - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3a2e736..96d3f5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2138,9 +2138,9 @@ class SQLConf extends Serializable with Logging { def continuousStreamingExecutorPollIntervalMs: Long = getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS) - def userV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST) + def useV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST) - def userV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST) + def useV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST) def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 3cd48ba..8460c79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -195,7 +195,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } val useV1Sources = - sparkSession.sessionState.conf.userV1SourceReaderList.toLowerCase(Locale.ROOT).split(",") + sparkSession.sessionState.conf.useV1SourceReaderList.toLowerCase(Locale.ROOT).split(",") val lookupCls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) val cls = lookupCls.newInstance() match { case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) || diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3b84151..18653b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -248,7 +248,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val session = df.sparkSession val useV1Sources = - session.sessionState.conf.userV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") + session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") val lookupCls = DataSource.lookupDataSource(source, session.sessionState.conf) val cls = lookupCls.newInstance() match { case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) || diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
[spark] branch branch-2.4 updated: [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ec53a19 [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite ec53a19 is described below commit ec53a19adbffba5a9f3e71cca4898f1bd7804abf Author: “attilapiros” AuthorDate: Fri Apr 26 14:49:49 2019 -0700 [SPARK-26891][BACKPORT-2.4][YARN] Fixing flaky test in YarnSchedulerBackendSuite ## What changes were proposed in this pull request? The test "RequestExecutors reflects node blacklist and is serializable" is flaky because of multi threaded access of the mock task scheduler. For details check [Mockito FAQ (occasional exceptions like: WrongTypeOfReturnValue)](https://github.com/mockito/mockito/wiki/FAQ#is-mockito-thread-safe). So instead of mocking the task scheduler in the test TaskSchedulerImpl is simply subclassed. This multithreaded access of the `nodeBlacklist()` method is coming from: 1) the unit test thread via calling of the method `prepareRequestExecutors()` 2) the `DriverEndpoint.onStart` which runs a periodic task that ends up calling this method ## How was this patch tested? Existing unittest. (cherry picked from commit e4e4e2b842bffba6805623f2258b27b162b451ba) Closes #24474 from attilapiros/SPARK-26891-branch-2.4. Authored-by: “attilapiros” Signed-off-by: Dongjoon Hyun --- .../cluster/YarnSchedulerBackendSuite.scala| 35 +- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala index 7fac57f..bd2cf97 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.scheduler.cluster +import java.util.concurrent.atomic.AtomicReference + import scala.language.reflectiveCalls import org.mockito.Mockito.when @@ -27,15 +29,35 @@ import org.apache.spark.serializer.JavaSerializer class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with LocalSparkContext { + private var yarnSchedulerBackend: YarnSchedulerBackend = _ + + override def afterEach(): Unit = { +try { + if (yarnSchedulerBackend != null) { +yarnSchedulerBackend.stop() + } +} finally { + super.afterEach() +} + } + test("RequestExecutors reflects node blacklist and is serializable") { sc = new SparkContext("local", "YarnSchedulerBackendSuite") -val sched = mock[TaskSchedulerImpl] -when(sched.sc).thenReturn(sc) -val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) { +// Subclassing the TaskSchedulerImpl here instead of using Mockito. For details see SPARK-26891. +val sched = new TaskSchedulerImpl(sc) { + val blacklistedNodes = new AtomicReference[Set[String]]() + + def setNodeBlacklist(nodeBlacklist: Set[String]): Unit = blacklistedNodes.set(nodeBlacklist) + + override def nodeBlacklist(): Set[String] = blacklistedNodes.get() +} + +val yarnSchedulerBackendExtended = new YarnSchedulerBackend(sched, sc) { def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = { this.hostToLocalTaskCount = hostToLocalTaskCount } } +yarnSchedulerBackend = yarnSchedulerBackendExtended val ser = new JavaSerializer(sc.conf).newInstance() for { blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c")) @@ -45,16 +67,15 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc Map("a" -> 1, "b" -> 2) ) } { - yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount) - when(sched.nodeBlacklist()).thenReturn(blacklist) - val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested) + yarnSchedulerBackendExtended.setHostToLocalTaskCount(hostToLocalCount) + sched.setNodeBlacklist(blacklist) + val req = yarnSchedulerBackendExtended.prepareRequestExecutors(numRequested) assert(req.requestedTotal === numRequested) assert(req.nodeBlacklist === blacklist) assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty) // Serialize to make sure serialization doesn't throw an error ser.serialize(req) } -sc.stop() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
[spark] branch master updated: [SPARK-27477][BUILD] Kafka token provider should have provided dependency on Spark
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7b367bf [SPARK-27477][BUILD] Kafka token provider should have provided dependency on Spark 7b367bf is described below commit 7b367bfc86dbe7f61c0dda4f4811508137bbd0cc Author: Koert Kuipers AuthorDate: Fri Apr 26 11:52:08 2019 -0700 [SPARK-27477][BUILD] Kafka token provider should have provided dependency on Spark ## What changes were proposed in this pull request? Change spark-token-provider-kafka-0-10 dependency on spark-core to be provided ## How was this patch tested? Ran existing unit tests Closes #24384 from koertkuipers/feat-kafka-token-provider-fix-deps. Authored-by: Koert Kuipers Signed-off-by: Marcelo Vanzin --- external/kafka-0-10-token-provider/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/external/kafka-0-10-token-provider/pom.xml b/external/kafka-0-10-token-provider/pom.xml index 40ef1f7..01ca96b 100644 --- a/external/kafka-0-10-token-provider/pom.xml +++ b/external/kafka-0-10-token-provider/pom.xml @@ -39,6 +39,7 @@ org.apache.spark spark-core_${scala.binary.version} ${project.version} + provided org.apache.spark - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27556][BUILD] Exclude com.zaxxer:HikariCP-java7 from hadoop-yarn-server-web-proxy
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fe99305 [SPARK-27556][BUILD] Exclude com.zaxxer:HikariCP-java7 from hadoop-yarn-server-web-proxy fe99305 is described below commit fe99305101fa01713021bc0ffae943066c07a0d0 Author: Yuming Wang AuthorDate: Fri Apr 26 12:15:39 2019 -0500 [SPARK-27556][BUILD] Exclude com.zaxxer:HikariCP-java7 from hadoop-yarn-server-web-proxy ## What changes were proposed in this pull request? There are two HikariCP packages in classpath when building with `-Phive -Pyarn -Phadoop-3.2`. The HikariCP dependency tree: ``` [INFO] | +- org.apache.hadoop:hadoop-yarn-server-web-proxy:jar:3.2.0:compile [INFO] | | \- org.apache.hadoop:hadoop-yarn-server-common:jar:3.2.0:compile [INFO] | | +- org.apache.hadoop:hadoop-yarn-registry:jar:3.2.0:compile [INFO] | | | \- commons-daemon:commons-daemon:jar:1.0.13:compile [INFO] | | +- org.apache.geronimo.specs:geronimo-jcache_1.0_spec:jar:1.0-alpha-1:compile [INFO] | | +- org.ehcache:ehcache:jar:3.3.1:compile [INFO] | | +- com.zaxxer:HikariCP-java7:jar:2.4.12:compile ``` ``` [INFO] +- org.apache.hive:hive-metastore:jar:2.3.4:compile [INFO] | +- javolution:javolution:jar:5.5.1:compile [INFO] | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] | +- com.jolbox:bonecp:jar:0.8.0.RELEASE:compile [INFO] | +- com.zaxxer:HikariCP:jar:2.5.1:compile ``` This pr exclude `com.zaxxer:HikariCP-java7` from `hadoop-yarn-server-web-proxy`. ## How was this patch tested? manual tests Closes #24450 from wangyum/SPARK-27556. Authored-by: Yuming Wang Signed-off-by: Sean Owen --- dev/deps/spark-deps-hadoop-3.2 | 1 - pom.xml| 5 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 8b3bd79..5874151 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -1,4 +1,3 @@ -HikariCP-java7-2.4.12.jar JavaEWAH-0.3.2.jar RoaringBitmap-0.7.45.jar ST4-4.0.4.jar diff --git a/pom.xml b/pom.xml index 5153957..91661cd 100644 --- a/pom.xml +++ b/pom.xml @@ -1210,6 +1210,11 @@ com.sun.jersey.contribs * + + +com.zaxxer +HikariCP-java7 + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] srowen closed pull request #199: Add note about Scala 2.12 default to 2.4.2 release notes
srowen closed pull request #199: Add note about Scala 2.12 default to 2.4.2 release notes URL: https://github.com/apache/spark-website/pull/199 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-website] branch asf-site updated: Add note about Scala 2.12 default to 2.4.2 release notes
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new f074f7d Add note about Scala 2.12 default to 2.4.2 release notes f074f7d is described below commit f074f7de8bc8f25826430f3294b8fc9892234f26 Author: Sean Owen AuthorDate: Fri Apr 26 12:14:01 2019 -0500 Add note about Scala 2.12 default to 2.4.2 release notes Author: Sean Owen Closes #199 from srowen/242ReleaseNotes. --- releases/_posts/2019-04-23-spark-release-2-4-2.md | 8 ++-- site/releases/spark-release-2-4-2.html| 8 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/releases/_posts/2019-04-23-spark-release-2-4-2.md b/releases/_posts/2019-04-23-spark-release-2-4-2.md index cccd823..6f74aba 100644 --- a/releases/_posts/2019-04-23-spark-release-2-4-2.md +++ b/releases/_posts/2019-04-23-spark-release-2-4-2.md @@ -13,9 +13,13 @@ meta: Spark 2.4.2 is a maintenance release containing stability fixes. This release is based on the branch-2.4 maintenance branch of Spark. We strongly recommend all 2.4 users to upgrade to this stable release. +Note that Scala 2.11 support is deprecated from 2.4.1 onwards. +As of 2.4.2, the pre-built convenience binaries are compiled for Scala 2.12. +Spark is still cross-published for 2.11 and 2.12 in Maven Central, and can be built for 2.11 from source. + ### Notable changes - - SPARK-27419: When setting spark.executor.heartbeatInterval to a value less than 1 seconds in branch-2.4, it will always fail because the value will be converted to 0 and the heartbeat will always timeout and finally kill the executor. - - Revert SPARK-25250: It may cause the job to hang forever, and is reverted in 2.4.2. + - [[SPARK-27419]](https://issues.apache.org/jira/browse/SPARK-27419): When setting `spark.executor.heartbeatInterval` to a value less than 1 seconds, it will always fail because the value will be converted to 0 and the heartbeat will always timeout and finally kill the executor. + - Revert [[SPARK-25250]](https://issues.apache.org/jira/browse/SPARK-25250): It may cause the job to hang forever, and is reverted in 2.4.2. You can consult JIRA for the [detailed changes](https://s.apache.org/spark-2.4.2). diff --git a/site/releases/spark-release-2-4-2.html b/site/releases/spark-release-2-4-2.html index 56adac3..b6d6cf1 100644 --- a/site/releases/spark-release-2-4-2.html +++ b/site/releases/spark-release-2-4-2.html @@ -205,10 +205,14 @@ Spark 2.4.2 is a maintenance release containing stability fixes. This release is based on the branch-2.4 maintenance branch of Spark. We strongly recommend all 2.4 users to upgrade to this stable release. +Note that Scala 2.11 support is deprecated from 2.4.1 onwards. +As of 2.4.2, the pre-built convenience binaries are compiled for Scala 2.12. +Spark is still cross-published for 2.11 and 2.12 in Maven Central, and can be built for 2.11 from source. + Notable changes - SPARK-27419: When setting spark.executor.heartbeatInterval to a value less than 1 seconds in branch-2.4, it will always fail because the value will be converted to 0 and the heartbeat will always timeout and finally kill the executor. - Revert SPARK-25250: It may cause the job to hang forever, and is reverted in 2.4.2. + https://issues.apache.org/jira/browse/SPARK-27419;>[SPARK-27419]: When setting spark.executor.heartbeatInterval to a value less than 1 seconds, it will always fail because the value will be converted to 0 and the heartbeat will always timeout and finally kill the executor. + Revert https://issues.apache.org/jira/browse/SPARK-25250;>[SPARK-25250]: It may cause the job to hang forever, and is reverted in 2.4.2. You can consult JIRA for the https://s.apache.org/spark-2.4.2;>detailed changes. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] srowen commented on issue #199: Add note about Scala 2.12 default to 2.4.2 release notes
srowen commented on issue #199: Add note about Scala 2.12 default to 2.4.2 release notes URL: https://github.com/apache/spark-website/pull/199#issuecomment-487080217 Agree, I figure it's best to call extra attention to it for now, either way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch0-2.3 deleted (was a956e9c)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch branch0-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git. was a956e9c [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch0-2.3 created (now a956e9c)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch branch0-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git. at a956e9c [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite This branch includes the following new commits: new a956e9c [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/01: [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch0-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git commit a956e9c765026de0009da4a5867bb768375c22ed Author: Wenchen Fan AuthorDate: Fri Apr 26 16:37:43 2019 +0900 [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite We can get the latest downloadable Spark versions from https://dist.apache.org/repos/dist/release/spark/ manually. Closes #24454 from cloud-fan/test. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon --- .../sql/hive/HiveExternalCatalogVersionsSuite.scala | 19 ++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 680abb6..916f73f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import scala.sys.process._ +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -166,6 +167,10 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { """.stripMargin.getBytes("utf8")) // scalastyle:on line.size.limit +if (PROCESS_TABLES.testingVersions.isEmpty) { + fail("Fail to get the lates Spark versions to test.") +} + PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) => val sparkHome = new File(sparkTestingDir, s"spark-$version") if (!sparkHome.exists()) { @@ -203,7 +208,19 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. - val testingVersions = Seq("2.3.3") + val testingVersions: Seq[String] = { +import scala.io.Source +try { + Source.fromURL("https://dist.apache.org/repos/dist/release/spark/;).mkString +.split("\n") +.filter(_.contains("".r.findFirstMatchIn(_).get.group(1)) +.filter(_ < org.apache.spark.SPARK_VERSION) +} catch { + // do not throw exception during object initialization. + case NonFatal(_) => Nil +} + } protected var spark: SparkSession = _ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.3 updated (257abc4 -> a956e9c)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git. from 257abc4 [SPARK-27496][CORE] Fatal errors should also be sent back to the sender add a956e9c [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite No new revisions were added by this update. Summary of changes: .../sql/hive/HiveExternalCatalogVersionsSuite.scala | 19 ++- 1 file changed, 18 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 29a4e04 [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite 29a4e04 is described below commit 29a4e048feb459e5121b6d21c741a81f48991f64 Author: Wenchen Fan AuthorDate: Fri Apr 26 16:37:43 2019 +0900 [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite ## What changes were proposed in this pull request? We can get the latest downloadable Spark versions from https://dist.apache.org/repos/dist/release/spark/ ## How was this patch tested? manually. Closes #24454 from cloud-fan/test. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon --- .../sql/hive/HiveExternalCatalogVersionsSuite.scala | 19 ++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index a4d6a69..8828471 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import scala.sys.process._ +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -166,6 +167,10 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { """.stripMargin.getBytes("utf8")) // scalastyle:on line.size.limit +if (PROCESS_TABLES.testingVersions.isEmpty) { + fail("Fail to get the lates Spark versions to test.") +} + PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) => val sparkHome = new File(sparkTestingDir, s"spark-$version") if (!sparkHome.exists()) { @@ -203,7 +208,19 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. - val testingVersions = Seq("2.3.3", "2.4.2") + val testingVersions: Seq[String] = { +import scala.io.Source +try { + Source.fromURL("https://dist.apache.org/repos/dist/release/spark/;).mkString +.split("\n") +.filter(_.contains("".r.findFirstMatchIn(_).get.group(1)) +.filter(_ < org.apache.spark.SPARK_VERSION) +} catch { + // do not throw exception during object initialization. + case NonFatal(_) => Nil +} + } protected var spark: SparkSession = _ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27190][SQL] add table capability for streaming
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 85fd552 [SPARK-27190][SQL] add table capability for streaming 85fd552 is described below commit 85fd552ed6304967f25574baef3cf9657957bcb1 Author: Wenchen Fan AuthorDate: Fri Apr 26 15:44:23 2019 +0800 [SPARK-27190][SQL] add table capability for streaming ## What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/24012 , to add the corresponding capabilities for streaming. ## How was this patch tested? existing tests Closes #24129 from cloud-fan/capability. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/sql/kafka010/KafkaSourceProvider.scala | 11 +- .../sql/sources/v2/SupportsContinuousRead.java | 35 -- .../sql/sources/v2/SupportsMicroBatchRead.java | 35 -- .../sql/sources/v2/SupportsStreamingWrite.java | 34 -- .../spark/sql/sources/v2/TableCapability.java | 19 +++ .../apache/spark/sql/sources/v2/reader/Scan.java | 10 +- .../datasources/noop/NoopDataSource.scala | 7 +- .../v2/V2StreamingScanSupportCheck.scala | 64 ++ .../execution/streaming/MicroBatchExecution.scala | 61 +- .../sql/execution/streaming/StreamExecution.scala | 4 +- .../spark/sql/execution/streaming/console.scala| 9 +- .../streaming/continuous/ContinuousExecution.scala | 22 ++-- .../spark/sql/execution/streaming/memory.scala | 9 +- .../streaming/sources/ForeachWriterTable.scala | 12 +- .../streaming/sources/RateStreamProvider.scala | 9 +- .../sources/TextSocketSourceProvider.scala | 9 +- .../sql/execution/streaming/sources/memoryV2.scala | 10 +- .../sql/internal/BaseSessionStateBuilder.scala | 3 +- .../spark/sql/streaming/DataStreamReader.scala | 5 +- .../spark/sql/streaming/DataStreamWriter.scala | 7 +- .../sql/streaming/StreamingQueryManager.scala | 6 +- .../v2/V2StreamingScanSupportCheckSuite.scala | 130 + .../sources/StreamingDataSourceV2Suite.scala | 106 ++--- .../spark/sql/hive/HiveSessionStateBuilder.scala | 3 +- 24 files changed, 389 insertions(+), 231 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index f7a2032..bb76a30 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.{Collections, Locale, UUID} +import java.util.{Locale, UUID} import scala.collection.JavaConverters._ @@ -29,9 +29,10 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.spark.internal.Logging import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.execution.streaming.{Sink, Source} +import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.sources.v2.writer.WriteBuilder @@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } class KafkaTable(strategy: => ConsumerStrategy) extends Table -with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite { +with SupportsRead with SupportsWrite with BaseStreamingSink { override def name(): String = s"Kafka $strategy" override def schema(): StructType = KafkaOffsetReader.kafkaSchema -override def capabilities(): ju.Set[TableCapability] = Collections.emptySet() +override def capabilities(): ju.Set[TableCapability] = { + Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava +} override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new KafkaScan(options) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java deleted file mode 100644 index 5cc9848..000 ---
[spark] branch master updated: [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2234667 [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite 2234667 is described below commit 2234667b159bf19a68758da3ff20cfae3c058c25 Author: Wenchen Fan AuthorDate: Fri Apr 26 16:37:43 2019 +0900 [SPARK-27563][SQL][TEST] automatically get the latest Spark versions in HiveExternalCatalogVersionsSuite ## What changes were proposed in this pull request? We can get the latest downloadable Spark versions from https://dist.apache.org/repos/dist/release/spark/ ## How was this patch tested? manually. Closes #24454 from cloud-fan/test. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon --- .../sql/hive/HiveExternalCatalogVersionsSuite.scala | 19 ++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 0a05ec5..ec10295 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import scala.sys.process._ +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -169,6 +170,10 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { """.stripMargin.getBytes("utf8")) // scalastyle:on line.size.limit +if (PROCESS_TABLES.testingVersions.isEmpty) { + fail("Fail to get the lates Spark versions to test.") +} + PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) => val sparkHome = new File(sparkTestingDir, s"spark-$version") if (!sparkHome.exists()) { @@ -206,7 +211,19 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. - val testingVersions = Seq("2.3.3", "2.4.2") + val testingVersions: Seq[String] = { +import scala.io.Source +try { + Source.fromURL("https://dist.apache.org/repos/dist/release/spark/;).mkString +.split("\n") +.filter(_.contains("".r.findFirstMatchIn(_).get.group(1)) +.filter(_ < org.apache.spark.SPARK_VERSION) +} catch { + // do not throw exception during object initialization. + case NonFatal(_) => Nil +} + } protected var spark: SparkSession = _ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: add missing import and fix compilation
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ed0739a add missing import and fix compilation ed0739a is described below commit ed0739a90128b697f598a58b7300dcfd5492fdba Author: Wenchen Fan AuthorDate: Fri Apr 26 15:33:20 2019 +0800 add missing import and fix compilation --- .../scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index da92019..36da2f1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-27494][SS] Null values don't work in Kafka source v2
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 705507f [SPARK-27494][SS] Null values don't work in Kafka source v2 705507f is described below commit 705507facda11060f1a0beb04d1dd19bda5fc4f3 Author: uncleGen AuthorDate: Fri Apr 26 14:25:31 2019 +0800 [SPARK-27494][SS] Null values don't work in Kafka source v2 ## What changes were proposed in this pull request? Right now Kafka source v2 doesn't support null values. The issue is in org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which doesn't handle null values. ## How was this patch tested? add new unit tests Closes #24441 from uncleGen/SPARK-27494. Authored-by: uncleGen Signed-off-by: Wenchen Fan (cherry picked from commit d2656aaecd4a7b5562d8d2065aaa66fdc72d253d) Signed-off-by: Wenchen Fan --- .../kafka010/KafkaRecordToUnsafeRowConverter.scala | 7 ++- .../sql/kafka010/KafkaContinuousSourceSuite.scala | 4 ++ .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 58 ++ 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala index f35a143..306ef10 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala @@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter { def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = { rowWriter.reset() +rowWriter.zeroOutNullBytes() if (record.key == null) { rowWriter.setNullAt(0) } else { rowWriter.write(0, record.key) } -rowWriter.write(1, record.value) +if (record.value == null) { + rowWriter.setNullAt(1) +} else { + rowWriter.write(1, record.value) +} rowWriter.write(2, UTF8String.fromString(record.topic)) rowWriter.write(3, record.partition) rowWriter.write(4, record.offset) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index a0e5818..649cb72 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo } } } + + test("SPARK-27494: read kafka record containing null key/values.") { +testNullableKeyValue(ContinuousTrigger(100)) + } } class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 34cf335..da92019 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -988,6 +988,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { q.stop() } } + + test("SPARK-27494: read kafka record containing null key/values.") { +testNullableKeyValue(Trigger.ProcessingTime(100)) + } } @@ -1461,6 +1465,60 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) ) } + + protected def testNullableKeyValue(trigger: Trigger): Unit = { +val table = "kafka_null_key_value_source_test" +withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withTranscationalProducer { producer => +val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_committed") + .option("startingOffsets", "earliest") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + +val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(trigger) +
[spark] branch master updated: [SPARK-27494][SS] Null values don't work in Kafka source v2
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d2656aa [SPARK-27494][SS] Null values don't work in Kafka source v2 d2656aa is described below commit d2656aaecd4a7b5562d8d2065aaa66fdc72d253d Author: uncleGen AuthorDate: Fri Apr 26 14:25:31 2019 +0800 [SPARK-27494][SS] Null values don't work in Kafka source v2 ## What changes were proposed in this pull request? Right now Kafka source v2 doesn't support null values. The issue is in org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which doesn't handle null values. ## How was this patch tested? add new unit tests Closes #24441 from uncleGen/SPARK-27494. Authored-by: uncleGen Signed-off-by: Wenchen Fan --- .../kafka010/KafkaRecordToUnsafeRowConverter.scala | 7 ++- .../sql/kafka010/KafkaContinuousSourceSuite.scala | 4 ++ .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 58 ++ 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala index f35a143..306ef10 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala @@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter { def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = { rowWriter.reset() +rowWriter.zeroOutNullBytes() if (record.key == null) { rowWriter.setNullAt(0) } else { rowWriter.write(0, record.key) } -rowWriter.write(1, record.value) +if (record.value == null) { + rowWriter.setNullAt(1) +} else { + rowWriter.write(1, record.value) +} rowWriter.write(2, UTF8String.fromString(record.topic)) rowWriter.write(3, record.partition) rowWriter.write(4, record.offset) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index be0cea2..9b3e78c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo } } } + + test("SPARK-27494: read kafka record containing null key/values.") { +testNullableKeyValue(ContinuousTrigger(100)) + } } class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 21634ae..b98f8e9 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { q.stop() } } + + test("SPARK-27494: read kafka record containing null key/values.") { +testNullableKeyValue(Trigger.ProcessingTime(100)) + } } @@ -1511,6 +1515,60 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) ) } + + protected def testNullableKeyValue(trigger: Trigger): Unit = { +val table = "kafka_null_key_value_source_test" +withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withTranscationalProducer { producer => +val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_committed") + .option("startingOffsets", "earliest") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + +val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(trigger) + .start() +try { + var idx = 0 + producer.beginTransaction() + val