[spark] branch branch-2.4 updated: [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2019-01-29 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new d5cc890  [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by 
inconsistency between a SQL cache's cached RDD and its physical plan
d5cc890 is described below

commit d5cc8909c72e958ce187df9c75847ad0125991ab
Author: maryannxue 
AuthorDate: Tue Jan 29 21:33:46 2019 +0900

[SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency 
between a SQL cache's cached RDD and its physical plan

## What changes were proposed in this pull request?

When performing non-cascading cache invalidation, `recache` is called on 
the other cache entries which are dependent on the cache being invalidated. It 
leads to the the physical plans of those cache entries being re-compiled. For 
those cache entries, if the cache RDD has already been persisted, chances are 
there will be inconsistency between the data and the new plan. It can cause a 
correctness issue if the new plan's `outputPartitioning`  or `outputOrdering` 
is different from the tha [...]

The fix is to keep the cache entry as it is if the data has been loaded, 
otherwise re-build the cache entry, with a new plan and an empty cache buffer.

## How was this patch tested?

Added UT.

Closes #23678 from maryannxue/spark-26708-2.4.

Authored-by: maryannxue 
Signed-off-by: Takeshi Yamamuro 
---
 .../apache/spark/sql/execution/CacheManager.scala  | 28 +++---
 .../sql/execution/columnar/InMemoryRelation.scala  | 10 +
 .../org/apache/spark/sql/DatasetCacheSuite.scala   | 44 +-
 3 files changed, 67 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index c992993..5b30596 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -166,16 +166,34 @@ class CacheManager extends Logging {
 val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
 while (it.hasNext) {
   val cd = it.next()
-  if (condition(cd.plan)) {
-if (clearCache) {
-  cd.cachedRepresentation.cacheBuilder.clearCache()
-}
+  // If `clearCache` is false (which means the recache request comes from 
a non-cascading
+  // cache invalidation) and the cache buffer has already been loaded, we 
do not need to
+  // re-compile a physical plan because the old plan will not be used any 
more by the
+  // CacheManager although it still lives in compiled `Dataset`s and it 
could still work.
+  // Otherwise, it means either `clearCache` is true, then we have to 
clear the cache buffer
+  // and re-compile the physical plan; or it is a non-cascading cache 
invalidation and cache
+  // buffer is still empty, then we could have a more efficient new plan 
by removing
+  // dependency on the previously removed cache entries.
+  // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call 
is a non-locking
+  // status test and may not return the most accurate cache buffer state. 
So the worse case
+  // scenario can be:
+  // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` 
returns false, then we
+  //will clear the buffer and build a new plan. It is inefficient but 
doesn't affect
+  //correctness.
+  // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` 
returns true, then we
+  //will keep it as it is. It means the physical plan has been 
re-compiled already in the
+  //other thread.
+  val buildNewPlan =
+clearCache || 
!cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
+  if (condition(cd.plan) && buildNewPlan) {
+cd.cachedRepresentation.cacheBuilder.clearCache()
 // Remove the cache entry before we create a new one, so that we can 
have a different
 // physical plan.
 it.remove()
 val plan = spark.sessionState.executePlan(cd.plan).executedPlan
 val newCache = InMemoryRelation(
-  cacheBuilder = 
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+  cacheBuilder = cd.cachedRepresentation
+.cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),
   logicalPlan = cd.plan)
 needToRecache += cd.copy(cachedRepresentation = newCache)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index b752b77..8eecd7a 100644
--- 
a/sql/core/src/main/scala/o

[spark] branch master updated: [SPARK-26763][SQL] Using fileStatus cache when filterPartitions

2019-01-29 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5d672b7  [SPARK-26763][SQL] Using fileStatus cache when 
filterPartitions
5d672b7 is described below

commit 5d672b7f3e07cfd7710df319fc6c7d2b9056a068
Author: Xianyang Liu 
AuthorDate: Tue Jan 29 23:11:11 2019 +0800

[SPARK-26763][SQL] Using fileStatus cache when filterPartitions

## What changes were proposed in this pull request?

We should pass the existed `fileStatusCache` to `InMemoryFileIndex` even 
though there aren't partition columns.

## How was this patch tested?

Existed test. Extra tests can be added if there is a requirement.

Closes #23683 from ConeyLiu/filestatuscache.

Authored-by: Xianyang Liu 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index a66a076..8736d07 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -84,8 +84,8 @@ class CatalogFileIndex(
   new PrunedInMemoryFileIndex(
 sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec, Option(timeNs))
 } else {
-  new InMemoryFileIndex(
-sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema 
= None)
+  new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties,
+userSpecifiedSchema = None, fileStatusCache = fileStatusCache)
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-11215][ML] Add multiple columns support to StringIndexer

2019-01-29 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3310789  [SPARK-11215][ML] Add multiple columns support to 
StringIndexer
3310789 is described below

commit 33107897ada29d1ed17f091f93260dfcef11c2e7
Author: Liang-Chi Hsieh 
AuthorDate: Tue Jan 29 09:21:25 2019 -0600

[SPARK-11215][ML] Add multiple columns support to StringIndexer

## What changes were proposed in this pull request?

This takes over #19621 to add multi-column support to StringIndexer:

1. Supports encoding multiple columns.
2. Previously, when specifying `frequencyDesc` or `frequencyAsc` as 
`stringOrderType` param in `StringIndexer`, in case of equal frequency, the 
order of strings is undefined. After this change, the strings with equal 
frequency are further sorted alphabetically.

## How was this patch tested?

Added tests.

Closes #20146 from viirya/SPARK-11215.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Sean Owen 
---
 R/pkg/tests/fulltests/test_mllib_classification.R  |   6 +-
 R/pkg/tests/fulltests/test_mllib_regression.R  |  42 ++-
 docs/ml-features.md|   6 +-
 docs/ml-guide.md   |   9 +
 .../apache/spark/ml/feature/StringIndexer.scala| 409 +
 ...2980-4c42-b8a7-a5a94265c479-c000.snappy.parquet | Bin 0 -> 478 bytes
 .../test-data/strIndexerModel/metadata/part-0  |   1 +
 .../apache/spark/ml/feature/RFormulaSuite.scala|  28 +-
 .../spark/ml/feature/StringIndexerSuite.scala  | 139 ++-
 project/MimaExcludes.scala |   4 +
 10 files changed, 531 insertions(+), 113 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R 
b/R/pkg/tests/fulltests/test_mllib_classification.R
index 023686e..9fdb0cf 100644
--- a/R/pkg/tests/fulltests/test_mllib_classification.R
+++ b/R/pkg/tests/fulltests/test_mllib_classification.R
@@ -313,7 +313,7 @@ test_that("spark.mlp", {
   # Test predict method
   mlpTestDF <- df
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", 
"0.0", "0.0", "0.0"))
+  expect_equal(head(mlpPredictions$prediction, 6), c("0.0", "1.0", "1.0", 
"1.0", "1.0", "1.0"))
 
   # Test model save/load
   if (windows_with_hadoop()) {
@@ -348,12 +348,12 @@ test_that("spark.mlp", {
 
   # Test random seed
   # default seed
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 
10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 
100)
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
   expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", 
"1.0", "0.0"))
   # seed equals 10
-  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 
10, seed = 10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 
100, seed = 10)
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
   expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", 
"1.0", "0.0"))
diff --git a/R/pkg/tests/fulltests/test_mllib_regression.R 
b/R/pkg/tests/fulltests/test_mllib_regression.R
index 23daca7..b40c4cb 100644
--- a/R/pkg/tests/fulltests/test_mllib_regression.R
+++ b/R/pkg/tests/fulltests/test_mllib_regression.R
@@ -102,10 +102,18 @@ test_that("spark.glm and predict", {
 })
 
 test_that("spark.glm summary", {
+  # prepare dataset
+  Sepal.Length <- c(2.0, 1.5, 1.8, 3.4, 5.1, 1.8, 1.0, 2.3)
+  Sepal.Width <- c(2.1, 2.3, 5.4, 4.7, 3.1, 2.1, 3.1, 5.5)
+  Petal.Length <- c(1.8, 2.1, 7.1, 2.5, 3.7, 6.3, 2.2, 7.2)
+  Species <- c("setosa", "versicolor", "versicolor", "versicolor", 
"virginica", "virginica",
+   "versicolor", "virginica")
+  dataset <- data.frame(Sepal.Length, Sepal.Width, Petal.Length, Species, 
stringsAsFactors = TRUE)
+
   # gaussian family
-  training <- suppressWarnings(createDataFrame(iris))
+  training <- suppressWarnings(createDataFrame(dataset))
   stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species))
-  rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
+  rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset))
 
   # test summary coefficients return matrix type
   expect_true(class(stats$coefficients) == "matrix")
@@ -126,15 +134,15 @@ test_that("spark.glm summary", {
 
   out <- capture.output(print(stats))
   expect_match(out[2], "Deviance Residuals:")
-  expect_true(any(grepl("AIC: 59.22", out)))
+  expect_true(any(grepl("AIC: 35.84", out)))
 
   # binomial family
-  df <- suppress

[spark] branch master updated: [SPARK-26702][SQL][TEST] Create a test trait for Parquet and Orc test

2019-01-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 66afd86  [SPARK-26702][SQL][TEST] Create a test trait for Parquet and 
Orc test
66afd86 is described below

commit 66afd869d1287ac0d069c2bf6b73712fd2dec19a
Author: Liang-Chi Hsieh 
AuthorDate: Tue Jan 29 07:31:42 2019 -0800

[SPARK-26702][SQL][TEST] Create a test trait for Parquet and Orc test

## What changes were proposed in this pull request?

For making test suite supporting both Parquet and Orc by reusing test 
cases, this patch extracts the methods for testing. For example, if we need to 
test a common feature shared by Parquet and Orc, we should be able to write 
test cases once and reuse them to test both formats.

This patch extracts the methods for testing and uses a variable 
`dataSourceName` to set up data format to test against with.

## How was this patch tested?

Existing tests.

Closes #23628 from viirya/datasource-test.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Dongjoon Hyun 
---
 .../datasources/FileBasedDataSourceTest.scala  | 106 +
 .../execution/datasources/orc/OrcQuerySuite.scala  |  12 ++-
 .../sql/execution/datasources/orc/OrcTest.scala|  40 +++-
 .../datasources/parquet/ParquetTest.scala  |  46 +++--
 4 files changed, 142 insertions(+), 62 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
new file mode 100644
index 000..bdb161d
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * A helper trait that provides convenient facilities for file-based data 
source testing.
+ * Specifically, it is used for Parquet and Orc testing. It can be used to 
write tests
+ * that are shared between Parquet and Orc.
+ */
+private[sql] trait FileBasedDataSourceTest extends SQLTestUtils {
+
+  // Defines the data source name to run the test.
+  protected val dataSourceName: String
+  // The SQL config key for enabling vectorized reader.
+  protected val vectorizedReaderEnabledKey: String
+
+  /**
+   * Reads data source file from given `path` as `DataFrame` and passes it to 
given function.
+   *
+   * @param path   The path to file
+   * @param testVectorized Whether to read the file with vectorized reader.
+   * @param f  The given function that takes a `DataFrame` as 
input.
+   */
+  protected def readFile(path: String, testVectorized: Boolean = true)
+  (f: DataFrame => Unit): Unit = {
+withSQLConf(vectorizedReaderEnabledKey -> "false") {
+  f(spark.read.format(dataSourceName).load(path.toString))
+}
+if (testVectorized) {
+  withSQLConf(vectorizedReaderEnabledKey -> "true") {
+f(spark.read.format(dataSourceName).load(path.toString))
+  }
+}
+  }
+
+  /**
+   * Writes `data` to a data source file, which is then passed to `f` and will 
be deleted after `f`
+   * returns.
+   */
+  protected def withDataSourceFile[T <: Product : ClassTag : TypeTag]
+  (data: Seq[T])
+  (f: String => Unit): Unit = {
+withTempPath { file =>
+  
spark.createDataFrame(data).write.format(dataSourceName).save(file.getCanonicalPath)
+  f(file.getCanonicalPath)
+}
+  }
+
+  /**
+   * Writes `data` to a data source file and reads it back as a [[DataFrame]],
+   * which is then passed to `f`. The file will be deleted after `f` returns.
+   */
+  protected def withDataSourceDataFrame[T <: Product : ClassTag : TypeTag]
+  (data: Seq[T], testVectorized: Boolean = 

[spark] branch master updated: [SPARK-26765][SQL] Avro: Validate input and output schema

2019-01-29 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1beed0d  [SPARK-26765][SQL] Avro: Validate input and output schema
1beed0d is described below

commit 1beed0d7c253da4fde12bfeea96cc0fbcc1aae25
Author: Gengliang Wang 
AuthorDate: Wed Jan 30 00:17:33 2019 +0800

[SPARK-26765][SQL] Avro: Validate input and output schema

## What changes were proposed in this pull request?

The API `supportDataType` in `FileFormat` helps to validate the 
output/input schema before exection starts. So that we can avoid some invalid 
data source IO, and users can see clean error messages.

This PR is to override the validation API in Avro data source.
Also, as per the spec of 
Avro(https://avro.apache.org/docs/1.8.2/spec.html), `NullType` is supported. 
This PR fixes the handling of `NullType`.

## How was this patch tested?

Unit test

Closes #23684 from gengliangwang/avroSupportDataType.

Authored-by: Gengliang Wang 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/avro/AvroFileFormat.scala | 19 +++-
 .../apache/spark/sql/avro/SchemaConverters.scala   |  5 ++-
 .../org/apache/spark/sql/avro/AvroSuite.scala  | 50 --
 3 files changed, 50 insertions(+), 24 deletions(-)

diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index e60fa88..7391665 100755
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 private[avro] class AvroFileFormat extends FileFormat
@@ -243,6 +243,23 @@ private[avro] class AvroFileFormat extends FileFormat
   }
 }
   }
+
+  override def supportDataType(dataType: DataType): Boolean = dataType match {
+case _: AtomicType => true
+
+case st: StructType => st.forall { f => supportDataType(f.dataType) }
+
+case ArrayType(elementType, _) => supportDataType(elementType)
+
+case MapType(keyType, valueType, _) =>
+  supportDataType(keyType) && supportDataType(valueType)
+
+case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
+
+case _: NullType => true
+
+case _ => false
+  }
 }
 
 private[avro] object AvroFileFormat {
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 64127af..3947d32 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -70,6 +70,8 @@ object SchemaConverters {
 
   case ENUM => SchemaType(StringType, nullable = false)
 
+  case NULL => SchemaType(NullType, nullable = true)
+
   case RECORD =>
 if (existingRecordNames.contains(avroSchema.getFullName)) {
   throw new IncompatibleSchemaException(s"""
@@ -151,6 +153,7 @@ object SchemaConverters {
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
   case StringType => builder.stringType()
+  case NullType => builder.nullType()
   case d: DecimalType =>
 val avroType = LogicalTypes.decimal(d.precision, d.scale)
 val fixedSize = minBytesForPrecision(d.precision)
@@ -181,7 +184,7 @@ object SchemaConverters {
   // This should never happen.
   case other => throw new IncompatibleSchemaException(s"Unexpected type 
$other.")
 }
-if (nullable) {
+if (nullable && catalystType != NullType) {
   Schema.createUnion(schema, nullSchema)
 } else {
   schema
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 207c54c..d803537 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net.URL
 import java.nio.file.{Files, Paths}
 import java.sql.{Date, Timestamp}
-import java.util.{TimeZone, UUID}
+import java.util.{Locale, TimeZone, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -35,6 +35,7 @@ import org.apache.commons.io.FileUtils

[spark] branch branch-5.2 created (now 1beed0d)

2019-01-29 Thread lixiao
This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a change to branch branch-5.2
in repository https://gitbox.apache.org/repos/asf/spark.git.


  at 1beed0d  [SPARK-26765][SQL] Avro: Validate input and output schema

No new revisions were added by this update.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation

2019-01-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fbc3c5e  [SPARK-26718][SS] Fixed integer overflow in SS kafka 
rateLimit calculation
fbc3c5e is described below

commit fbc3c5e8a33c28e46d839a2d1db81d9a89b29327
Author: ryne.yang 
AuthorDate: Tue Jan 29 10:58:10 2019 -0800

[SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation

## What changes were proposed in this pull request?

Fix the integer overflow issue in rateLimit.

## How was this patch tested?

Pass the Jenkins with newly added UT for the possible case where integer 
could be overflowed.

Closes #23666 from linehrr/master.

Authored-by: ryne.yang 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 10 ++-
 .../apache/spark/sql/kafka010/KafkaSource.scala| 10 ++-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 35 ++
 3 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 3ae9bd3..337a51e 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -231,7 +231,15 @@ private[kafka010] class KafkaMicroBatchStream(
 val begin = from.get(tp).getOrElse(fromNew(tp))
 val prorate = limit * (size / total)
 // Don't completely starve small topicpartitions
-val off = begin + (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+val prorateLong = (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+// need to be careful of integer overflow
+// therefore added canary checks where to see if off variable 
could be overflowed
+// refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+val off = if (prorateLong > Long.MaxValue - begin) {
+  Long.MaxValue
+} else {
+  begin + prorateLong
+}
 // Paranoia, make sure not to return an offset that's past end
 Math.min(end, off)
   }.getOrElse(end)
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index da55334..624c796 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -191,7 +191,15 @@ private[kafka010] class KafkaSource(
 val prorate = limit * (size / total)
 logDebug(s"rateLimit $tp prorated amount is $prorate")
 // Don't completely starve small topicpartitions
-val off = begin + (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+val prorateLong = (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+// need to be careful of integer overflow
+// therefore added canary checks where to see if off variable 
could be overflowed
+// refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+val off = if (prorateLong > Long.MaxValue - begin) {
+  Long.MaxValue
+} else {
+  begin + prorateLong
+}
 logDebug(s"rateLimit $tp new offset is $off")
 // Paranoia, make sure not to return an offset that's past end
 Math.min(end, off)
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index aa7baac..8fd5790 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -196,6 +196,41 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   StopStream)
   }
 
+  test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " +
+"during end offset calculation") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+// fill in 5 messages to trigger potential integer overflow
+testUtils.sendMessages(topic, (0 until 5).map(_.toString).toArray, Some(0))
+
+val partitionOffsets = 

[spark] branch branch-2.4 updated: [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS kafka rateLimit calculation

2019-01-29 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new ae0592d  [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS 
kafka rateLimit calculation
ae0592d is described below

commit ae0592ddf7009934e9a5ee05a06a1cf80e354393
Author: ryne.yang 
AuthorDate: Tue Jan 29 12:40:28 2019 -0800

[SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS kafka rateLimit 
calculation

## What changes were proposed in this pull request?

Fix the integer overflow issue in rateLimit.

## How was this patch tested?

Pass the Jenkins with newly added UT for the possible case where integer 
could be overflowed.

Closes #23652 from linehrr/fix/integer_overflow_rateLimit.

Authored-by: ryne.yang 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/kafka010/KafkaMicroBatchReader.scala | 10 ++-
 .../apache/spark/sql/kafka010/KafkaSource.scala| 10 ++-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 35 ++
 3 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index b6c8035..1333bc2 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -239,7 +239,15 @@ private[kafka010] class KafkaMicroBatchReader(
 val begin = from.get(tp).getOrElse(fromNew(tp))
 val prorate = limit * (size / total)
 // Don't completely starve small topicpartitions
-val off = begin + (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+val prorateLong = (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+// need to be careful of integer overflow
+// therefore added canary checks where to see if off variable 
could be overflowed
+// refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+val off = if (prorateLong > Long.MaxValue - begin) {
+  Long.MaxValue
+} else {
+  begin + prorateLong
+}
 // Paranoia, make sure not to return an offset that's past end
 Math.min(end, off)
   }.getOrElse(end)
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index d65b3ce..464ad64 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -190,7 +190,15 @@ private[kafka010] class KafkaSource(
 val prorate = limit * (size / total)
 logDebug(s"rateLimit $tp prorated amount is $prorate")
 // Don't completely starve small topicpartitions
-val off = begin + (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+val prorateLong = (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+// need to be careful of integer overflow
+// therefore added canary checks where to see if off variable 
could be overflowed
+// refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+val off = if (prorateLong > Long.MaxValue - begin) {
+  Long.MaxValue
+} else {
+  begin + prorateLong
+}
 logDebug(s"rateLimit $tp new offset is $off")
 // Paranoia, make sure not to return an offset that's past end
 Math.min(end, off)
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 5f05833..34cf335 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -199,6 +199,41 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   StopStream)
   }
 
+  test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " +
+"during end offset calculation") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+// fill in 5 messages to trigger potential integer overflow
+testUtils.sendMessages(topic, (0 until 5).map(_.t

[spark] branch master updated: [SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution barrier check

2019-01-29 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c08021c  [SPARK-26776][PYTHON] Reduce Py4J communication cost in 
PySpark's execution barrier check
c08021c is described below

commit c08021cd8734b3cc183e7f65312d14cdaa8541b7
Author: Hyukjin Kwon 
AuthorDate: Wed Jan 30 12:24:27 2019 +0800

[SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution 
barrier check

## What changes were proposed in this pull request?

I am investigating flaky tests. I realised that:

```
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
2512, in __init__
self.is_barrier = prev._is_barrier() or isFromBarrier
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
2412, in _is_barrier
return self._jrdd.rdd().isBarrier()
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 342, in get_return_value
return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 2492, in 
lambda target_id, gateway_client: JavaObject(target_id, 
gateway_client))
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1324, in __init__
ThreadSafeFinalizer.add_finalizer(key, value)
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
 line 43, in add_finalizer
cls.finalizers[id] = weak_ref
  File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, 
in __exit__
self.release()
  File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, 
in release
self.__block.release()
error: release unlocked lock
```

I assume it might not be directly related with the test itself but I 
noticed that it `prev._is_barrier()` attempts to access via Py4J.

Accessing via Py4J is expensive. Therefore, this PR proposes to avoid Py4J 
access when `isFromBarrier` is `True`.

## How was this patch tested?

Unittests should cover this.

Closes #23690 from HyukjinKwon/minor-barrier.

Authored-by: Hyukjin Kwon 
Signed-off-by: Wenchen Fan 
---
 python/pyspark/rdd.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7396930..751df44 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2509,7 +2509,7 @@ class PipelinedRDD(RDD):
 self._jrdd_deserializer = self.ctx.serializer
 self._bypass_serializer = False
 self.partitioner = prev.partitioner if self.preservesPartitioning else 
None
-self.is_barrier = prev._is_barrier() or isFromBarrier
+self.is_barrier = isFromBarrier or prev._is_barrier()
 
 def getNumPartitions(self):
 return self._prev_jrdd.partitions().size()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables

2019-01-29 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7781c6f  [SPARK-26378][SQL] Restore performance of queries against 
wide CSV/JSON tables
7781c6f is described below

commit 7781c6fd7334979b6b4222d2271765219593f08e
Author: Bruce Robbins 
AuthorDate: Wed Jan 30 15:15:29 2019 +0800

[SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON 
tables

## What changes were proposed in this pull request?

After [recent 
changes](https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2)
 to CSV parsing to return partial results for bad CSV records, queries of wide 
CSV tables slowed considerably. That recent change resulted in every row being 
recreated, even when the associated input record had no parsing issues and the 
user specified no corrupt record field in his/her schema.

The change to FailureSafeParser.scala also impacted queries against wide 
JSON tables as well.

In this PR, I propose that a row should be recreated only if columns need 
to be shifted due to the existence of a corrupt column field in the 
user-supplied schema. Otherwise, the code should use the row as-is (For CSV 
input, it will have values for the columns that could be converted, and also 
null values for columns that could not be converted).

See benchmarks below. The CSV benchmark for 1000 columns went from 120144 
ms to 89069 ms, a savings of 25% (this only brings the cost down to baseline 
levels. Again, see benchmarks below).

Similarly, the JSON benchmark for 1000 columns (added in this PR) went from 
109621 ms to 80871 ms, also a savings of 25%.

Still, partial results functionality is preserved:


bash-3.2$ cat test2.csv
"hello",1999-08-01,"last"
"there","bad date","field"
"again","2017-11-22","in file"
bash-3.2$ bin/spark-shell
...etc...
scala> val df = spark.read.schema("a string, b date, c 
string").csv("test2.csv")
df: org.apache.spark.sql.DataFrame = [a: string, b: date ... 1 more field]
scala> df.show
+-+--+---+
|a| b|  c|
+-+--+---+
|hello|1999-08-01|   last|
|there|  null|  field|
|again|2017-11-22|in file|
+-+--+---+
scala> val df = spark.read.schema("badRecord string, a string, b date, c 
string").
 | option("columnNameOfCorruptRecord", "badRecord").
 | csv("test2.csv")
df: org.apache.spark.sql.DataFrame = [badRecord: string, a: string ... 2 
more fields]
scala> df.show
++-+--+---+
|   badRecord|a| b|  c|
++-+--+---+
|null|hello|1999-08-01|   last|
|"there","bad date...|there|  null|  field|
|null|again|2017-11-22|in file|
++-+--+---+
scala>


### CSVBenchmark Benchmarks:

baseline = commit before partial results change
PR = this PR
master = master branch


[baseline_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697109/baseline_CSVBenchmark-results.txt)

[pr_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697110/pr_CSVBenchmark-results.txt)

[master_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697111/master_CSVBenchmark-results.txt)

### JSONBenchmark Benchmarks:

baseline = commit before partial results change
PR = this PR
master = master branch


[baseline_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711040/baseline_JSONBenchmark-results.txt)

[pr_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711041/pr_JSONBenchmark-results.txt)

[master_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711042/master_JSONBenchmark-results.txt)

## How was this patch tested?

- All SQL unit tests.
- Added 2 CSV benchmarks
- Python core and SQL tests

Closes #23336 from bersprockets/csv-wide-row-opt2.

Authored-by: Bruce Robbins 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/catalyst/util/FailureSafeParser.scala  | 21 +
 sql/core/benchmarks/CSVBenchmark-results.txt   | 31 +++--
 sql/core/benchmarks/JSONBenchmark-results.txt  | 54 +-
 .../execution/datasources/csv/CSVBenchmark.scala   | 16 +++
 .../execution/datasources/json/JsonBenchmark.scala | 48 ++-
 5 files changed, 124 insertions(+), 46 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafe