[spark] branch branch-2.4 updated: [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new d5cc890 [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan d5cc890 is described below commit d5cc8909c72e958ce187df9c75847ad0125991ab Author: maryannxue AuthorDate: Tue Jan 29 21:33:46 2019 +0900 [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan ## What changes were proposed in this pull request? When performing non-cascading cache invalidation, `recache` is called on the other cache entries which are dependent on the cache being invalidated. It leads to the the physical plans of those cache entries being re-compiled. For those cache entries, if the cache RDD has already been persisted, chances are there will be inconsistency between the data and the new plan. It can cause a correctness issue if the new plan's `outputPartitioning` or `outputOrdering` is different from the tha [...] The fix is to keep the cache entry as it is if the data has been loaded, otherwise re-build the cache entry, with a new plan and an empty cache buffer. ## How was this patch tested? Added UT. Closes #23678 from maryannxue/spark-26708-2.4. Authored-by: maryannxue Signed-off-by: Takeshi Yamamuro --- .../apache/spark/sql/execution/CacheManager.scala | 28 +++--- .../sql/execution/columnar/InMemoryRelation.scala | 10 + .../org/apache/spark/sql/DatasetCacheSuite.scala | 44 +- 3 files changed, 67 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index c992993..5b30596 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -166,16 +166,34 @@ class CacheManager extends Logging { val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] while (it.hasNext) { val cd = it.next() - if (condition(cd.plan)) { -if (clearCache) { - cd.cachedRepresentation.cacheBuilder.clearCache() -} + // If `clearCache` is false (which means the recache request comes from a non-cascading + // cache invalidation) and the cache buffer has already been loaded, we do not need to + // re-compile a physical plan because the old plan will not be used any more by the + // CacheManager although it still lives in compiled `Dataset`s and it could still work. + // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer + // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache + // buffer is still empty, then we could have a more efficient new plan by removing + // dependency on the previously removed cache entries. + // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking + // status test and may not return the most accurate cache buffer state. So the worse case + // scenario can be: + // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we + //will clear the buffer and build a new plan. It is inefficient but doesn't affect + //correctness. + // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we + //will keep it as it is. It means the physical plan has been re-compiled already in the + //other thread. + val buildNewPlan = +clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded + if (condition(cd.plan) && buildNewPlan) { +cd.cachedRepresentation.cacheBuilder.clearCache() // Remove the cache entry before we create a new one, so that we can have a different // physical plan. it.remove() val plan = spark.sessionState.executePlan(cd.plan).executedPlan val newCache = InMemoryRelation( - cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan), + cacheBuilder = cd.cachedRepresentation +.cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null), logicalPlan = cd.plan) needToRecache += cd.copy(cachedRepresentation = newCache) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index b752b77..8eecd7a 100644 --- a/sql/core/src/main/scala/o
[spark] branch master updated: [SPARK-26763][SQL] Using fileStatus cache when filterPartitions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5d672b7 [SPARK-26763][SQL] Using fileStatus cache when filterPartitions 5d672b7 is described below commit 5d672b7f3e07cfd7710df319fc6c7d2b9056a068 Author: Xianyang Liu AuthorDate: Tue Jan 29 23:11:11 2019 +0800 [SPARK-26763][SQL] Using fileStatus cache when filterPartitions ## What changes were proposed in this pull request? We should pass the existed `fileStatusCache` to `InMemoryFileIndex` even though there aren't partition columns. ## How was this patch tested? Existed test. Extra tests can be added if there is a requirement. Closes #23683 from ConeyLiu/filestatuscache. Authored-by: Xianyang Liu Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index a66a076..8736d07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -84,8 +84,8 @@ class CatalogFileIndex( new PrunedInMemoryFileIndex( sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs)) } else { - new InMemoryFileIndex( -sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None) + new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, +userSpecifiedSchema = None, fileStatusCache = fileStatusCache) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-11215][ML] Add multiple columns support to StringIndexer
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3310789 [SPARK-11215][ML] Add multiple columns support to StringIndexer 3310789 is described below commit 33107897ada29d1ed17f091f93260dfcef11c2e7 Author: Liang-Chi Hsieh AuthorDate: Tue Jan 29 09:21:25 2019 -0600 [SPARK-11215][ML] Add multiple columns support to StringIndexer ## What changes were proposed in this pull request? This takes over #19621 to add multi-column support to StringIndexer: 1. Supports encoding multiple columns. 2. Previously, when specifying `frequencyDesc` or `frequencyAsc` as `stringOrderType` param in `StringIndexer`, in case of equal frequency, the order of strings is undefined. After this change, the strings with equal frequency are further sorted alphabetically. ## How was this patch tested? Added tests. Closes #20146 from viirya/SPARK-11215. Authored-by: Liang-Chi Hsieh Signed-off-by: Sean Owen --- R/pkg/tests/fulltests/test_mllib_classification.R | 6 +- R/pkg/tests/fulltests/test_mllib_regression.R | 42 ++- docs/ml-features.md| 6 +- docs/ml-guide.md | 9 + .../apache/spark/ml/feature/StringIndexer.scala| 409 + ...2980-4c42-b8a7-a5a94265c479-c000.snappy.parquet | Bin 0 -> 478 bytes .../test-data/strIndexerModel/metadata/part-0 | 1 + .../apache/spark/ml/feature/RFormulaSuite.scala| 28 +- .../spark/ml/feature/StringIndexerSuite.scala | 139 ++- project/MimaExcludes.scala | 4 + 10 files changed, 531 insertions(+), 113 deletions(-) diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R index 023686e..9fdb0cf 100644 --- a/R/pkg/tests/fulltests/test_mllib_classification.R +++ b/R/pkg/tests/fulltests/test_mllib_classification.R @@ -313,7 +313,7 @@ test_that("spark.mlp", { # Test predict method mlpTestDF <- df mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0")) + expect_equal(head(mlpPredictions$prediction, 6), c("0.0", "1.0", "1.0", "1.0", "1.0", "1.0")) # Test model save/load if (windows_with_hadoop()) { @@ -348,12 +348,12 @@ test_that("spark.mlp", { # Test random seed # default seed - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100) mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) expect_equal(head(mlpPredictions$prediction, 10), c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) # seed equals 10 - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 100, seed = 10) mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) expect_equal(head(mlpPredictions$prediction, 10), c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) diff --git a/R/pkg/tests/fulltests/test_mllib_regression.R b/R/pkg/tests/fulltests/test_mllib_regression.R index 23daca7..b40c4cb 100644 --- a/R/pkg/tests/fulltests/test_mllib_regression.R +++ b/R/pkg/tests/fulltests/test_mllib_regression.R @@ -102,10 +102,18 @@ test_that("spark.glm and predict", { }) test_that("spark.glm summary", { + # prepare dataset + Sepal.Length <- c(2.0, 1.5, 1.8, 3.4, 5.1, 1.8, 1.0, 2.3) + Sepal.Width <- c(2.1, 2.3, 5.4, 4.7, 3.1, 2.1, 3.1, 5.5) + Petal.Length <- c(1.8, 2.1, 7.1, 2.5, 3.7, 6.3, 2.2, 7.2) + Species <- c("setosa", "versicolor", "versicolor", "versicolor", "virginica", "virginica", + "versicolor", "virginica") + dataset <- data.frame(Sepal.Length, Sepal.Width, Petal.Length, Species, stringsAsFactors = TRUE) + # gaussian family - training <- suppressWarnings(createDataFrame(iris)) + training <- suppressWarnings(createDataFrame(dataset)) stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species)) - rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) + rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset)) # test summary coefficients return matrix type expect_true(class(stats$coefficients) == "matrix") @@ -126,15 +134,15 @@ test_that("spark.glm summary", { out <- capture.output(print(stats)) expect_match(out[2], "Deviance Residuals:") - expect_true(any(grepl("AIC: 59.22", out))) + expect_true(any(grepl("AIC: 35.84", out))) # binomial family - df <- suppress
[spark] branch master updated: [SPARK-26702][SQL][TEST] Create a test trait for Parquet and Orc test
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 66afd86 [SPARK-26702][SQL][TEST] Create a test trait for Parquet and Orc test 66afd86 is described below commit 66afd869d1287ac0d069c2bf6b73712fd2dec19a Author: Liang-Chi Hsieh AuthorDate: Tue Jan 29 07:31:42 2019 -0800 [SPARK-26702][SQL][TEST] Create a test trait for Parquet and Orc test ## What changes were proposed in this pull request? For making test suite supporting both Parquet and Orc by reusing test cases, this patch extracts the methods for testing. For example, if we need to test a common feature shared by Parquet and Orc, we should be able to write test cases once and reuse them to test both formats. This patch extracts the methods for testing and uses a variable `dataSourceName` to set up data format to test against with. ## How was this patch tested? Existing tests. Closes #23628 from viirya/datasource-test. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun --- .../datasources/FileBasedDataSourceTest.scala | 106 + .../execution/datasources/orc/OrcQuerySuite.scala | 12 ++- .../sql/execution/datasources/orc/OrcTest.scala| 40 +++- .../datasources/parquet/ParquetTest.scala | 46 +++-- 4 files changed, 142 insertions(+), 62 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala new file mode 100644 index 000..bdb161d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.test.SQLTestUtils + +/** + * A helper trait that provides convenient facilities for file-based data source testing. + * Specifically, it is used for Parquet and Orc testing. It can be used to write tests + * that are shared between Parquet and Orc. + */ +private[sql] trait FileBasedDataSourceTest extends SQLTestUtils { + + // Defines the data source name to run the test. + protected val dataSourceName: String + // The SQL config key for enabling vectorized reader. + protected val vectorizedReaderEnabledKey: String + + /** + * Reads data source file from given `path` as `DataFrame` and passes it to given function. + * + * @param path The path to file + * @param testVectorized Whether to read the file with vectorized reader. + * @param f The given function that takes a `DataFrame` as input. + */ + protected def readFile(path: String, testVectorized: Boolean = true) + (f: DataFrame => Unit): Unit = { +withSQLConf(vectorizedReaderEnabledKey -> "false") { + f(spark.read.format(dataSourceName).load(path.toString)) +} +if (testVectorized) { + withSQLConf(vectorizedReaderEnabledKey -> "true") { +f(spark.read.format(dataSourceName).load(path.toString)) + } +} + } + + /** + * Writes `data` to a data source file, which is then passed to `f` and will be deleted after `f` + * returns. + */ + protected def withDataSourceFile[T <: Product : ClassTag : TypeTag] + (data: Seq[T]) + (f: String => Unit): Unit = { +withTempPath { file => + spark.createDataFrame(data).write.format(dataSourceName).save(file.getCanonicalPath) + f(file.getCanonicalPath) +} + } + + /** + * Writes `data` to a data source file and reads it back as a [[DataFrame]], + * which is then passed to `f`. The file will be deleted after `f` returns. + */ + protected def withDataSourceDataFrame[T <: Product : ClassTag : TypeTag] + (data: Seq[T], testVectorized: Boolean =
[spark] branch master updated: [SPARK-26765][SQL] Avro: Validate input and output schema
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1beed0d [SPARK-26765][SQL] Avro: Validate input and output schema 1beed0d is described below commit 1beed0d7c253da4fde12bfeea96cc0fbcc1aae25 Author: Gengliang Wang AuthorDate: Wed Jan 30 00:17:33 2019 +0800 [SPARK-26765][SQL] Avro: Validate input and output schema ## What changes were proposed in this pull request? The API `supportDataType` in `FileFormat` helps to validate the output/input schema before exection starts. So that we can avoid some invalid data source IO, and users can see clean error messages. This PR is to override the validation API in Avro data source. Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), `NullType` is supported. This PR fixes the handling of `NullType`. ## How was this patch tested? Unit test Closes #23684 from gengliangwang/avroSupportDataType. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/avro/AvroFileFormat.scala | 19 +++- .../apache/spark/sql/avro/SchemaConverters.scala | 5 ++- .../org/apache/spark/sql/avro/AvroSuite.scala | 50 -- 3 files changed, 50 insertions(+), 24 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index e60fa88..7391665 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.sources.{DataSourceRegister, Filter} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, Utils} private[avro] class AvroFileFormat extends FileFormat @@ -243,6 +243,23 @@ private[avro] class AvroFileFormat extends FileFormat } } } + + override def supportDataType(dataType: DataType): Boolean = dataType match { +case _: AtomicType => true + +case st: StructType => st.forall { f => supportDataType(f.dataType) } + +case ArrayType(elementType, _) => supportDataType(elementType) + +case MapType(keyType, valueType, _) => + supportDataType(keyType) && supportDataType(valueType) + +case udt: UserDefinedType[_] => supportDataType(udt.sqlType) + +case _: NullType => true + +case _ => false + } } private[avro] object AvroFileFormat { diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 64127af..3947d32 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -70,6 +70,8 @@ object SchemaConverters { case ENUM => SchemaType(StringType, nullable = false) + case NULL => SchemaType(NullType, nullable = true) + case RECORD => if (existingRecordNames.contains(avroSchema.getFullName)) { throw new IncompatibleSchemaException(s""" @@ -151,6 +153,7 @@ object SchemaConverters { case FloatType => builder.floatType() case DoubleType => builder.doubleType() case StringType => builder.stringType() + case NullType => builder.nullType() case d: DecimalType => val avroType = LogicalTypes.decimal(d.precision, d.scale) val fixedSize = minBytesForPrecision(d.precision) @@ -181,7 +184,7 @@ object SchemaConverters { // This should never happen. case other => throw new IncompatibleSchemaException(s"Unexpected type $other.") } -if (nullable) { +if (nullable && catalystType != NullType) { Schema.createUnion(schema, nullSchema) } else { schema diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 207c54c..d803537 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -21,7 +21,7 @@ import java.io._ import java.net.URL import java.nio.file.{Files, Paths} import java.sql.{Date, Timestamp} -import java.util.{TimeZone, UUID} +import java.util.{Locale, TimeZone, UUID} import scala.collection.JavaConverters._ @@ -35,6 +35,7 @@ import org.apache.commons.io.FileUtils
[spark] branch branch-5.2 created (now 1beed0d)
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a change to branch branch-5.2 in repository https://gitbox.apache.org/repos/asf/spark.git. at 1beed0d [SPARK-26765][SQL] Avro: Validate input and output schema No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fbc3c5e [SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation fbc3c5e is described below commit fbc3c5e8a33c28e46d839a2d1db81d9a89b29327 Author: ryne.yang AuthorDate: Tue Jan 29 10:58:10 2019 -0800 [SPARK-26718][SS] Fixed integer overflow in SS kafka rateLimit calculation ## What changes were proposed in this pull request? Fix the integer overflow issue in rateLimit. ## How was this patch tested? Pass the Jenkins with newly added UT for the possible case where integer could be overflowed. Closes #23666 from linehrr/master. Authored-by: ryne.yang Signed-off-by: Dongjoon Hyun --- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 10 ++- .../apache/spark/sql/kafka010/KafkaSource.scala| 10 ++- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 35 ++ 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 3ae9bd3..337a51e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -231,7 +231,15 @@ private[kafka010] class KafkaMicroBatchStream( val begin = from.get(tp).getOrElse(fromNew(tp)) val prorate = limit * (size / total) // Don't completely starve small topicpartitions -val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +// need to be careful of integer overflow +// therefore added canary checks where to see if off variable could be overflowed +// refer to [https://issues.apache.org/jira/browse/SPARK-26718] +val off = if (prorateLong > Long.MaxValue - begin) { + Long.MaxValue +} else { + begin + prorateLong +} // Paranoia, make sure not to return an offset that's past end Math.min(end, off) }.getOrElse(end) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index da55334..624c796 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -191,7 +191,15 @@ private[kafka010] class KafkaSource( val prorate = limit * (size / total) logDebug(s"rateLimit $tp prorated amount is $prorate") // Don't completely starve small topicpartitions -val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +// need to be careful of integer overflow +// therefore added canary checks where to see if off variable could be overflowed +// refer to [https://issues.apache.org/jira/browse/SPARK-26718] +val off = if (prorateLong > Long.MaxValue - begin) { + Long.MaxValue +} else { + begin + prorateLong +} logDebug(s"rateLimit $tp new offset is $off") // Paranoia, make sure not to return an offset that's past end Math.min(end, off) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index aa7baac..8fd5790 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -196,6 +196,41 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { StopStream) } + test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " + +"during end offset calculation") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +// fill in 5 messages to trigger potential integer overflow +testUtils.sendMessages(topic, (0 until 5).map(_.toString).toArray, Some(0)) + +val partitionOffsets =
[spark] branch branch-2.4 updated: [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS kafka rateLimit calculation
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new ae0592d [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS kafka rateLimit calculation ae0592d is described below commit ae0592ddf7009934e9a5ee05a06a1cf80e354393 Author: ryne.yang AuthorDate: Tue Jan 29 12:40:28 2019 -0800 [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS kafka rateLimit calculation ## What changes were proposed in this pull request? Fix the integer overflow issue in rateLimit. ## How was this patch tested? Pass the Jenkins with newly added UT for the possible case where integer could be overflowed. Closes #23652 from linehrr/fix/integer_overflow_rateLimit. Authored-by: ryne.yang Signed-off-by: Dongjoon Hyun --- .../spark/sql/kafka010/KafkaMicroBatchReader.scala | 10 ++- .../apache/spark/sql/kafka010/KafkaSource.scala| 10 ++- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 35 ++ 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index b6c8035..1333bc2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -239,7 +239,15 @@ private[kafka010] class KafkaMicroBatchReader( val begin = from.get(tp).getOrElse(fromNew(tp)) val prorate = limit * (size / total) // Don't completely starve small topicpartitions -val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +// need to be careful of integer overflow +// therefore added canary checks where to see if off variable could be overflowed +// refer to [https://issues.apache.org/jira/browse/SPARK-26718] +val off = if (prorateLong > Long.MaxValue - begin) { + Long.MaxValue +} else { + begin + prorateLong +} // Paranoia, make sure not to return an offset that's past end Math.min(end, off) }.getOrElse(end) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index d65b3ce..464ad64 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -190,7 +190,15 @@ private[kafka010] class KafkaSource( val prorate = limit * (size / total) logDebug(s"rateLimit $tp prorated amount is $prorate") // Don't completely starve small topicpartitions -val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong +// need to be careful of integer overflow +// therefore added canary checks where to see if off variable could be overflowed +// refer to [https://issues.apache.org/jira/browse/SPARK-26718] +val off = if (prorateLong > Long.MaxValue - begin) { + Long.MaxValue +} else { + begin + prorateLong +} logDebug(s"rateLimit $tp new offset is $off") // Paranoia, make sure not to return an offset that's past end Math.min(end, off) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 5f05833..34cf335 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -199,6 +199,41 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { StopStream) } + test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " + +"during end offset calculation") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +// fill in 5 messages to trigger potential integer overflow +testUtils.sendMessages(topic, (0 until 5).map(_.t
[spark] branch master updated: [SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution barrier check
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c08021c [SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution barrier check c08021c is described below commit c08021cd8734b3cc183e7f65312d14cdaa8541b7 Author: Hyukjin Kwon AuthorDate: Wed Jan 30 12:24:27 2019 +0800 [SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution barrier check ## What changes were proposed in this pull request? I am investigating flaky tests. I realised that: ``` File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 2512, in __init__ self.is_barrier = prev._is_barrier() or isFromBarrier File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 2412, in _is_barrier return self._jrdd.rdd().isBarrier() File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 342, in get_return_value return OUTPUT_CONVERTER[type](answer[2:], gateway_client) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 2492, in lambda target_id, gateway_client: JavaObject(target_id, gateway_client)) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1324, in __init__ ThreadSafeFinalizer.add_finalizer(key, value) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", line 43, in add_finalizer cls.finalizers[id] = weak_ref File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in __exit__ self.release() File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in release self.__block.release() error: release unlocked lock ``` I assume it might not be directly related with the test itself but I noticed that it `prev._is_barrier()` attempts to access via Py4J. Accessing via Py4J is expensive. Therefore, this PR proposes to avoid Py4J access when `isFromBarrier` is `True`. ## How was this patch tested? Unittests should cover this. Closes #23690 from HyukjinKwon/minor-barrier. Authored-by: Hyukjin Kwon Signed-off-by: Wenchen Fan --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 7396930..751df44 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2509,7 +2509,7 @@ class PipelinedRDD(RDD): self._jrdd_deserializer = self.ctx.serializer self._bypass_serializer = False self.partitioner = prev.partitioner if self.preservesPartitioning else None -self.is_barrier = prev._is_barrier() or isFromBarrier +self.is_barrier = isFromBarrier or prev._is_barrier() def getNumPartitions(self): return self._prev_jrdd.partitions().size() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7781c6f [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables 7781c6f is described below commit 7781c6fd7334979b6b4222d2271765219593f08e Author: Bruce Robbins AuthorDate: Wed Jan 30 15:15:29 2019 +0800 [SPARK-26378][SQL] Restore performance of queries against wide CSV/JSON tables ## What changes were proposed in this pull request? After [recent changes](https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2) to CSV parsing to return partial results for bad CSV records, queries of wide CSV tables slowed considerably. That recent change resulted in every row being recreated, even when the associated input record had no parsing issues and the user specified no corrupt record field in his/her schema. The change to FailureSafeParser.scala also impacted queries against wide JSON tables as well. In this PR, I propose that a row should be recreated only if columns need to be shifted due to the existence of a corrupt column field in the user-supplied schema. Otherwise, the code should use the row as-is (For CSV input, it will have values for the columns that could be converted, and also null values for columns that could not be converted). See benchmarks below. The CSV benchmark for 1000 columns went from 120144 ms to 89069 ms, a savings of 25% (this only brings the cost down to baseline levels. Again, see benchmarks below). Similarly, the JSON benchmark for 1000 columns (added in this PR) went from 109621 ms to 80871 ms, also a savings of 25%. Still, partial results functionality is preserved: bash-3.2$ cat test2.csv "hello",1999-08-01,"last" "there","bad date","field" "again","2017-11-22","in file" bash-3.2$ bin/spark-shell ...etc... scala> val df = spark.read.schema("a string, b date, c string").csv("test2.csv") df: org.apache.spark.sql.DataFrame = [a: string, b: date ... 1 more field] scala> df.show +-+--+---+ |a| b| c| +-+--+---+ |hello|1999-08-01| last| |there| null| field| |again|2017-11-22|in file| +-+--+---+ scala> val df = spark.read.schema("badRecord string, a string, b date, c string"). | option("columnNameOfCorruptRecord", "badRecord"). | csv("test2.csv") df: org.apache.spark.sql.DataFrame = [badRecord: string, a: string ... 2 more fields] scala> df.show ++-+--+---+ | badRecord|a| b| c| ++-+--+---+ |null|hello|1999-08-01| last| |"there","bad date...|there| null| field| |null|again|2017-11-22|in file| ++-+--+---+ scala> ### CSVBenchmark Benchmarks: baseline = commit before partial results change PR = this PR master = master branch [baseline_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697109/baseline_CSVBenchmark-results.txt) [pr_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697110/pr_CSVBenchmark-results.txt) [master_CSVBenchmark-results.txt](https://github.com/apache/spark/files/2697111/master_CSVBenchmark-results.txt) ### JSONBenchmark Benchmarks: baseline = commit before partial results change PR = this PR master = master branch [baseline_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711040/baseline_JSONBenchmark-results.txt) [pr_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711041/pr_JSONBenchmark-results.txt) [master_JSONBenchmark-results.txt](https://github.com/apache/spark/files/2711042/master_JSONBenchmark-results.txt) ## How was this patch tested? - All SQL unit tests. - Added 2 CSV benchmarks - Python core and SQL tests Closes #23336 from bersprockets/csv-wide-row-opt2. Authored-by: Bruce Robbins Signed-off-by: Hyukjin Kwon --- .../sql/catalyst/util/FailureSafeParser.scala | 21 + sql/core/benchmarks/CSVBenchmark-results.txt | 31 +++-- sql/core/benchmarks/JSONBenchmark-results.txt | 54 +- .../execution/datasources/csv/CSVBenchmark.scala | 16 +++ .../execution/datasources/json/JsonBenchmark.scala | 48 ++- 5 files changed, 124 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafe