[spark] branch branch-3.0 updated: [SPARK-31087] [SQL] Add Back Multiple Removed APIs
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new f375930 [SPARK-31087] [SQL] Add Back Multiple Removed APIs f375930 is described below commit f375930d81337f2facbe5da71bb126d4d935e49d Author: gatorsmile AuthorDate: Sat Mar 28 22:05:16 2020 -0700 [SPARK-31087] [SQL] Add Back Multiple Removed APIs ### What changes were proposed in this pull request? Based on the discussion in the mailing list [[Proposal] Modification to Spark's Semantic Versioning Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html) , this PR is to add back the following APIs whose maintenance cost are relatively small. - functions.toDegrees/toRadians - functions.approxCountDistinct - functions.monotonicallyIncreasingId - Column.!== - Dataset.explode - Dataset.registerTempTable - SQLContext.getOrCreate, setActive, clearActive, constructors Below is the other removed APIs in the original PR, but not added back in this PR [https://issues.apache.org/jira/browse/SPARK-25908]: - Remove some AccumulableInfo .apply() methods - Remove non-label-specific multiclass precision/recall/fScore in favor of accuracy - Remove unused Python StorageLevel constants - Remove unused multiclass option in libsvm parsing - Remove references to deprecated spark configs like spark.yarn.am.port - Remove TaskContext.isRunningLocally - Remove ShuffleMetrics.shuffle* methods - Remove BaseReadWrite.context in favor of session ### Why are the changes needed? Avoid breaking the APIs that are commonly used. ### Does this PR introduce any user-facing change? Adding back the APIs that were removed in 3.0 branch does not introduce the user-facing changes, because Spark 3.0 has not been released. ### How was this patch tested? Added a new test suite for these APIs. Author: gatorsmile Author: yi.wu Closes #27821 from gatorsmile/addAPIBackV2. (cherry picked from commit 3884455780a214c620f309e00d5a083039746755) Signed-off-by: gatorsmile --- project/MimaExcludes.scala | 8 -- python/pyspark/sql/dataframe.py| 19 python/pyspark/sql/functions.py| 11 ++ .../main/scala/org/apache/spark/sql/Column.scala | 18 .../main/scala/org/apache/spark/sql/Dataset.scala | 98 ++ .../scala/org/apache/spark/sql/SQLContext.scala| 50 - .../scala/org/apache/spark/sql/functions.scala | 79 ++ .../org/apache/spark/sql/DataFrameSuite.scala | 46 + .../org/apache/spark/sql/DeprecatedAPISuite.scala | 114 + .../org/apache/spark/sql/SQLContextSuite.scala | 30 -- 10 files changed, 458 insertions(+), 15 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9a5029e..d1ed48a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -235,14 +235,6 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleWriteTime"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleRecordsWritten"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.AccumulableInfo.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.approxCountDistinct"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toRadians"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toDegrees"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.monotonicallyIncreasingId"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.clearActive"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.getOrCreate"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.setActive"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.SQLContext.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.fMeasure"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.recall"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.precision"), diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 44cb264..2a366dc
[spark] branch master updated: [SPARK-31087] [SQL] Add Back Multiple Removed APIs
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3884455 [SPARK-31087] [SQL] Add Back Multiple Removed APIs 3884455 is described below commit 3884455780a214c620f309e00d5a083039746755 Author: gatorsmile AuthorDate: Sat Mar 28 22:05:16 2020 -0700 [SPARK-31087] [SQL] Add Back Multiple Removed APIs ### What changes were proposed in this pull request? Based on the discussion in the mailing list [[Proposal] Modification to Spark's Semantic Versioning Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html) , this PR is to add back the following APIs whose maintenance cost are relatively small. - functions.toDegrees/toRadians - functions.approxCountDistinct - functions.monotonicallyIncreasingId - Column.!== - Dataset.explode - Dataset.registerTempTable - SQLContext.getOrCreate, setActive, clearActive, constructors Below is the other removed APIs in the original PR, but not added back in this PR [https://issues.apache.org/jira/browse/SPARK-25908]: - Remove some AccumulableInfo .apply() methods - Remove non-label-specific multiclass precision/recall/fScore in favor of accuracy - Remove unused Python StorageLevel constants - Remove unused multiclass option in libsvm parsing - Remove references to deprecated spark configs like spark.yarn.am.port - Remove TaskContext.isRunningLocally - Remove ShuffleMetrics.shuffle* methods - Remove BaseReadWrite.context in favor of session ### Why are the changes needed? Avoid breaking the APIs that are commonly used. ### Does this PR introduce any user-facing change? Adding back the APIs that were removed in 3.0 branch does not introduce the user-facing changes, because Spark 3.0 has not been released. ### How was this patch tested? Added a new test suite for these APIs. Author: gatorsmile Author: yi.wu Closes #27821 from gatorsmile/addAPIBackV2. --- project/MimaExcludes.scala | 8 -- python/pyspark/sql/dataframe.py| 19 python/pyspark/sql/functions.py| 11 ++ .../main/scala/org/apache/spark/sql/Column.scala | 18 .../main/scala/org/apache/spark/sql/Dataset.scala | 98 ++ .../scala/org/apache/spark/sql/SQLContext.scala| 50 - .../scala/org/apache/spark/sql/functions.scala | 79 ++ .../org/apache/spark/sql/DataFrameSuite.scala | 46 + .../org/apache/spark/sql/DeprecatedAPISuite.scala | 114 + .../org/apache/spark/sql/SQLContextSuite.scala | 30 -- 10 files changed, 458 insertions(+), 15 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 3f521e6..f28ae56 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -242,14 +242,6 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleWriteTime"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.ShuffleWriteMetrics.shuffleRecordsWritten"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.AccumulableInfo.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.approxCountDistinct"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toRadians"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.toDegrees"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.functions.monotonicallyIncreasingId"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.clearActive"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.getOrCreate"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.setActive"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.SQLContext.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.fMeasure"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.recall"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.MulticlassMetrics.precision"), diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 971cdb1..78b5746 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -122,6 +122,25 @@ class
[spark] branch branch-3.0 updated: [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6f8b3e0 [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas 6f8b3e0 is described below commit 6f8b3e0bea3c6639ba7648b41580198ad925683a Author: HyukjinKwon AuthorDate: Sun Mar 29 13:59:18 2020 +0900 [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas ### What changes were proposed in this pull request? This PR proposes to make pandas function APIs (`groupby.(cogroup.)applyInPandas` and `mapInPandas`) to ignore Python type hints. ### Why are the changes needed? Python type hints are optional. It shouldn't affect where pandas UDFs are not used. This is also a future work for them to support other type hints. We shouldn't at least throw an exception at this moment. ### Does this PR introduce any user-facing change? No, it's master-only change. ```python import pandas as pd def pandas_plus_one(pdf: pd.DataFrame) -> pd.DataFrame: return pdf + 1 spark.range(10).groupby('id').applyInPandas(pandas_plus_one, schema="id long").show() ``` ```python import pandas as pd def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: return left + 1 spark.range(10).groupby('id').cogroup(spark.range(10).groupby("id")).applyInPandas(pandas_plus_one, schema="id long").show() ``` ```python from typing import Iterator import pandas as pd def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: return map(lambda v: v + 1, iter) spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show() ``` **Before:** Exception **After:** ``` +---+ | id| +---+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| | 10| +---+ ``` ### How was this patch tested? Closes #28052 from HyukjinKwon/SPARK-31287. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon (cherry picked from commit 3165a95a04448546ae8955020566d718c6960223) Signed-off-by: HyukjinKwon --- python/pyspark/sql/pandas/functions.py | 8 + .../pyspark/sql/tests/test_pandas_udf_typehints.py | 42 ++ 2 files changed, 50 insertions(+) diff --git a/python/pyspark/sql/pandas/functions.py b/python/pyspark/sql/pandas/functions.py index 31aa321..f43ebf8 100644 --- a/python/pyspark/sql/pandas/functions.py +++ b/python/pyspark/sql/pandas/functions.py @@ -384,6 +384,14 @@ def _create_pandas_udf(f, returnType, evalType): "In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for " "pandas UDF instead of specifying pandas UDF type which will be deprecated " "in the future releases. See SPARK-28264 for more details.", UserWarning) +elif evalType in [PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, + PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF]: +# In case of 'SQL_GROUPED_MAP_PANDAS_UDF', deprecation warning is being triggered +# at `apply` instead. +# In case of 'SQL_MAP_PANDAS_ITER_UDF' and 'SQL_COGROUPED_MAP_PANDAS_UDF', the +# evaluation type will always be set. +pass elif len(argspec.annotations) > 0: evalType = infer_eval_type(signature(f)) assert evalType is not None diff --git a/python/pyspark/sql/tests/test_pandas_udf_typehints.py b/python/pyspark/sql/tests/test_pandas_udf_typehints.py index 7c83c78..2582080 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_typehints.py +++ b/python/pyspark/sql/tests/test_pandas_udf_typehints.py @@ -261,6 +261,48 @@ class PandasUDFTypeHintsTests(ReusedSQLTestCase): expected = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v, 1.0)')).sort('id') assert_frame_equal(expected.toPandas(), actual.toPandas()) +def test_ignore_type_hint_in_group_apply_in_pandas(self): +df = self.spark.range(10) +exec( +"def pandas_plus_one(v: pd.DataFrame) -> pd.DataFrame:\n" +"return v + 1", +self.local) + +pandas_plus_one = self.local["pandas_plus_one"] + +actual = df.groupby('id').applyInPandas(pandas_plus_one, schema=df.schema).sort('id') +expected = df.selectExpr("id + 1 as id") +assert_frame_equal(expected.toPandas(), actual.toPandas()) + +def test_ignore_type_hint_in_cogroup_apply_in_pandas(self): +df = self.spark.range(10) +
[spark] branch master updated: [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3165a95 [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas 3165a95 is described below commit 3165a95a04448546ae8955020566d718c6960223 Author: HyukjinKwon AuthorDate: Sun Mar 29 13:59:18 2020 +0900 [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas ### What changes were proposed in this pull request? This PR proposes to make pandas function APIs (`groupby.(cogroup.)applyInPandas` and `mapInPandas`) to ignore Python type hints. ### Why are the changes needed? Python type hints are optional. It shouldn't affect where pandas UDFs are not used. This is also a future work for them to support other type hints. We shouldn't at least throw an exception at this moment. ### Does this PR introduce any user-facing change? No, it's master-only change. ```python import pandas as pd def pandas_plus_one(pdf: pd.DataFrame) -> pd.DataFrame: return pdf + 1 spark.range(10).groupby('id').applyInPandas(pandas_plus_one, schema="id long").show() ``` ```python import pandas as pd def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: return left + 1 spark.range(10).groupby('id').cogroup(spark.range(10).groupby("id")).applyInPandas(pandas_plus_one, schema="id long").show() ``` ```python from typing import Iterator import pandas as pd def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: return map(lambda v: v + 1, iter) spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show() ``` **Before:** Exception **After:** ``` +---+ | id| +---+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| | 10| +---+ ``` ### How was this patch tested? Closes #28052 from HyukjinKwon/SPARK-31287. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- python/pyspark/sql/pandas/functions.py | 8 + .../pyspark/sql/tests/test_pandas_udf_typehints.py | 42 ++ 2 files changed, 50 insertions(+) diff --git a/python/pyspark/sql/pandas/functions.py b/python/pyspark/sql/pandas/functions.py index 31aa321..f43ebf8 100644 --- a/python/pyspark/sql/pandas/functions.py +++ b/python/pyspark/sql/pandas/functions.py @@ -384,6 +384,14 @@ def _create_pandas_udf(f, returnType, evalType): "In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for " "pandas UDF instead of specifying pandas UDF type which will be deprecated " "in the future releases. See SPARK-28264 for more details.", UserWarning) +elif evalType in [PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, + PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF]: +# In case of 'SQL_GROUPED_MAP_PANDAS_UDF', deprecation warning is being triggered +# at `apply` instead. +# In case of 'SQL_MAP_PANDAS_ITER_UDF' and 'SQL_COGROUPED_MAP_PANDAS_UDF', the +# evaluation type will always be set. +pass elif len(argspec.annotations) > 0: evalType = infer_eval_type(signature(f)) assert evalType is not None diff --git a/python/pyspark/sql/tests/test_pandas_udf_typehints.py b/python/pyspark/sql/tests/test_pandas_udf_typehints.py index 7c83c78..2582080 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_typehints.py +++ b/python/pyspark/sql/tests/test_pandas_udf_typehints.py @@ -261,6 +261,48 @@ class PandasUDFTypeHintsTests(ReusedSQLTestCase): expected = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v, 1.0)')).sort('id') assert_frame_equal(expected.toPandas(), actual.toPandas()) +def test_ignore_type_hint_in_group_apply_in_pandas(self): +df = self.spark.range(10) +exec( +"def pandas_plus_one(v: pd.DataFrame) -> pd.DataFrame:\n" +"return v + 1", +self.local) + +pandas_plus_one = self.local["pandas_plus_one"] + +actual = df.groupby('id').applyInPandas(pandas_plus_one, schema=df.schema).sort('id') +expected = df.selectExpr("id + 1 as id") +assert_frame_equal(expected.toPandas(), actual.toPandas()) + +def test_ignore_type_hint_in_cogroup_apply_in_pandas(self): +df = self.spark.range(10) +exec( +"def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:\n" +
[spark] branch branch-2.4 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 801d6a9 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 801d6a9 is described below commit 801d6a92d958f7b9762466e3c6643e54c48eb3a2 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e847e40..5579e95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -82,7 +82,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2ea8f4f..866d8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1822,6 +1822,20 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 38c262b [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 38c262b is described below commit 38c262b97e4fa10b249d51d3f69a0c97760492a3 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2c5a9d7..8e87a827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -101,7 +101,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f9a510d..366cf11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 801d6a9 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 801d6a9 is described below commit 801d6a92d958f7b9762466e3c6643e54c48eb3a2 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e847e40..5579e95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -82,7 +82,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2ea8f4f..866d8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1822,6 +1822,20 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 38c262b [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 38c262b is described below commit 38c262b97e4fa10b249d51d3f69a0c97760492a3 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2c5a9d7..8e87a827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -101,7 +101,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f9a510d..366cf11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 38c262b [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 38c262b is described below commit 38c262b97e4fa10b249d51d3f69a0c97760492a3 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2c5a9d7..8e87a827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -101,7 +101,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f9a510d..366cf11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 801d6a9 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 801d6a9 is described below commit 801d6a92d958f7b9762466e3c6643e54c48eb3a2 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e847e40..5579e95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -82,7 +82,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2ea8f4f..866d8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1822,6 +1822,20 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 801d6a9 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 801d6a9 is described below commit 801d6a92d958f7b9762466e3c6643e54c48eb3a2 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e847e40..5579e95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -82,7 +82,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2ea8f4f..866d8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1822,6 +1822,20 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 38c262b [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 38c262b is described below commit 38c262b97e4fa10b249d51d3f69a0c97760492a3 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2c5a9d7..8e87a827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -101,7 +101,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f9a510d..366cf11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 801d6a9 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 801d6a9 is described below commit 801d6a92d958f7b9762466e3c6643e54c48eb3a2 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../sql/execution/datasources/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e847e40..5579e95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -82,7 +82,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2ea8f4f..866d8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1822,6 +1822,20 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (34c7476 -> 791d2ba)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 34c7476 [SPARK-30722][DOCS][FOLLOW-UP] Add Pandas Function API into the menu add 791d2ba [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 38c262b [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 38c262b is described below commit 38c262b97e4fa10b249d51d3f69a0c97760492a3 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon (cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732) Signed-off-by: HyukjinKwon --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2c5a9d7..8e87a827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -101,7 +101,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f9a510d..366cf11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 791d2ba [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified 791d2ba is described below commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732 Author: Zhenhua Wang AuthorDate: Sun Mar 29 13:30:14 2020 +0900 [SPARK-31261][SQL] Avoid npe when reading bad csv input with `columnNameCorruptRecord` specified ### What changes were proposed in this pull request? SPARK-25387 avoids npe for bad csv input, but when reading bad csv input with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it still throws npe. ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Add a test. Closes #28029 from wzhfy/corrupt_column_npe. Authored-by: Zhenhua Wang Signed-off-by: HyukjinKwon --- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala| 3 ++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2c5a9d7..8e87a827 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -101,7 +101,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) +val currentContent = tokenizer.getContext.currentParsedContent() +if (currentContent == null) null else UTF8String.fromString(currentContent.stripLineEnd) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f9a510d..366cf11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1897,6 +1897,20 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(spark.read.csv(input).collect().toSet == Set(Row())) } + test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") { +val schema = StructType( + StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil) +val input = spark.createDataset(Seq("\u\u\u0001234")) + +checkAnswer( + spark.read +.option("columnNameOfCorruptRecord", "_corrupt_record") +.schema(schema) +.csv(input), + Row(null, null)) +assert(spark.read.csv(input).collect().toSet == Set(Row())) + } + test("field names of inferred schema shouldn't compare to the first row") { val input = Seq("1,2").toDS() val df = spark.read.option("enforceSchema", false).csv(input) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30722][DOCS][FOLLOW-UP] Add Pandas Function API into the menu
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1c4fe31 [SPARK-30722][DOCS][FOLLOW-UP] Add Pandas Function API into the menu 1c4fe31 is described below commit 1c4fe31a5a697c80ca59ed6286ad1423d8541e6a Author: HyukjinKwon AuthorDate: Sat Mar 28 18:36:34 2020 -0700 [SPARK-30722][DOCS][FOLLOW-UP] Add Pandas Function API into the menu ### What changes were proposed in this pull request? This PR adds "Pandas Function API" into the menu. ### Why are the changes needed? To be consistent and to make easier to navigate. ### Does this PR introduce any user-facing change? No, master only. ![Screen Shot 2020-03-27 at 11 40 29 PM](https://user-images.githubusercontent.com/6477701/77767405-60306600-7084-11ea-944a-93726259cd00.png) ### How was this patch tested? Manually verified by `SKIP_API=1 jekyll build`. Closes #28054 from HyukjinKwon/followup-spark-30722. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun (cherry picked from commit 34c7476cb5da98d5f3be354669dcd762df2b75e1) Signed-off-by: Dongjoon Hyun --- docs/_data/menu-sql.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml index c17bfd3..8a1d937 100644 --- a/docs/_data/menu-sql.yaml +++ b/docs/_data/menu-sql.yaml @@ -67,6 +67,8 @@ url: sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas - text: "Pandas UDFs (a.k.a. Vectorized UDFs)" url: sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs +- text: "Pandas Function APIs" + url: sql-pyspark-pandas-with-arrow.html#pandas-function-apis - text: Usage Notes url: sql-pyspark-pandas-with-arrow.html#usage-notes - text: Migration Guide - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0b237bd -> 34c7476)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0b237bd [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability add 34c7476 [SPARK-30722][DOCS][FOLLOW-UP] Add Pandas Function API into the menu No new revisions were added by this update. Summary of changes: docs/_data/menu-sql.yaml | 2 ++ 1 file changed, 2 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 71dcf66 [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability 71dcf66 is described below commit 71dcf6691a48dd622b83e128aa9be30f757b45ec Author: Kengo Seki AuthorDate: Sun Mar 29 08:48:08 2020 +0900 [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability ### What changes were proposed in this pull request? This PR replaces the method calls of `toSet.toSeq` with `distinct`. ### Why are the changes needed? `toSet.toSeq` is intended to make its elements unique but a bit verbose. Using `distinct` instead is easier to understand and improves readability. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested with the existing unit tests and found no problem. Closes #28062 from sekikn/SPARK-31292. Authored-by: Kengo Seki Signed-off-by: Takeshi Yamamuro (cherry picked from commit 0b237bd615da4b2c2b781e72af4ad3a4f2951444) Signed-off-by: Takeshi Yamamuro --- core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 7dd7fc1..994b363 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -149,7 +149,7 @@ private[spark] object ResourceUtils extends Logging { def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = { sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_PREFIX.").map { case (key, _) => key.substring(0, key.indexOf('.')) -}.toSet.toSeq.map(name => new ResourceID(componentName, name)) +}.distinct.map(name => new ResourceID(componentName, name)) } def parseAllResourceRequests( diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 857c89d..15f2161 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -69,7 +69,7 @@ private[spark] class ResultTask[T, U]( with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { -if (locs == null) Nil else locs.toSet.toSeq +if (locs == null) Nil else locs.distinct } override def runTask(context: TaskContext): U = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 4c0c30a..a0ba920 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -71,7 +71,7 @@ private[spark] class ShuffleMapTask( } @transient private val preferredLocs: Seq[TaskLocation] = { -if (locs == null) Nil else locs.toSet.toSeq +if (locs == null) Nil else locs.distinct } override def runTask(context: TaskContext): MapStatus = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6a1d460..ed30473 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -408,7 +408,7 @@ private[spark] class TaskSchedulerImpl( newExecAvail = true } } -val hosts = offers.map(_.host).toSet.toSeq +val hosts = offers.map(_.host).distinct for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e7ecf84..a083cdb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -758,7 +758,7 @@ class TaskSchedulerImplSuite extends
[spark] branch branch-3.0 updated: [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 71dcf66 [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability 71dcf66 is described below commit 71dcf6691a48dd622b83e128aa9be30f757b45ec Author: Kengo Seki AuthorDate: Sun Mar 29 08:48:08 2020 +0900 [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability ### What changes were proposed in this pull request? This PR replaces the method calls of `toSet.toSeq` with `distinct`. ### Why are the changes needed? `toSet.toSeq` is intended to make its elements unique but a bit verbose. Using `distinct` instead is easier to understand and improves readability. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested with the existing unit tests and found no problem. Closes #28062 from sekikn/SPARK-31292. Authored-by: Kengo Seki Signed-off-by: Takeshi Yamamuro (cherry picked from commit 0b237bd615da4b2c2b781e72af4ad3a4f2951444) Signed-off-by: Takeshi Yamamuro --- core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 7dd7fc1..994b363 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -149,7 +149,7 @@ private[spark] object ResourceUtils extends Logging { def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = { sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_PREFIX.").map { case (key, _) => key.substring(0, key.indexOf('.')) -}.toSet.toSeq.map(name => new ResourceID(componentName, name)) +}.distinct.map(name => new ResourceID(componentName, name)) } def parseAllResourceRequests( diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 857c89d..15f2161 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -69,7 +69,7 @@ private[spark] class ResultTask[T, U]( with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { -if (locs == null) Nil else locs.toSet.toSeq +if (locs == null) Nil else locs.distinct } override def runTask(context: TaskContext): U = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 4c0c30a..a0ba920 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -71,7 +71,7 @@ private[spark] class ShuffleMapTask( } @transient private val preferredLocs: Seq[TaskLocation] = { -if (locs == null) Nil else locs.toSet.toSeq +if (locs == null) Nil else locs.distinct } override def runTask(context: TaskContext): MapStatus = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6a1d460..ed30473 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -408,7 +408,7 @@ private[spark] class TaskSchedulerImpl( newExecAvail = true } } -val hosts = offers.map(_.host).toSet.toSeq +val hosts = offers.map(_.host).distinct for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e7ecf84..a083cdb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -758,7 +758,7 @@ class TaskSchedulerImplSuite extends
[spark] branch master updated: [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0b237bd [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability 0b237bd is described below commit 0b237bd615da4b2c2b781e72af4ad3a4f2951444 Author: Kengo Seki AuthorDate: Sun Mar 29 08:48:08 2020 +0900 [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability ### What changes were proposed in this pull request? This PR replaces the method calls of `toSet.toSeq` with `distinct`. ### Why are the changes needed? `toSet.toSeq` is intended to make its elements unique but a bit verbose. Using `distinct` instead is easier to understand and improves readability. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested with the existing unit tests and found no problem. Closes #28062 from sekikn/SPARK-31292. Authored-by: Kengo Seki Signed-off-by: Takeshi Yamamuro --- core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 36ef906..162f090 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -150,7 +150,7 @@ private[spark] object ResourceUtils extends Logging { def listResourceIds(sparkConf: SparkConf, componentName: String): Seq[ResourceID] = { sparkConf.getAllWithPrefix(s"$componentName.$RESOURCE_PREFIX.").map { case (key, _) => key.substring(0, key.indexOf('.')) -}.toSet.toSeq.map(name => new ResourceID(componentName, name)) +}.distinct.map(name => new ResourceID(componentName, name)) } def parseAllResourceRequests( diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 857c89d..15f2161 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -69,7 +69,7 @@ private[spark] class ResultTask[T, U]( with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { -if (locs == null) Nil else locs.toSet.toSeq +if (locs == null) Nil else locs.distinct } override def runTask(context: TaskContext): U = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 4c0c30a..a0ba920 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -71,7 +71,7 @@ private[spark] class ShuffleMapTask( } @transient private val preferredLocs: Seq[TaskLocation] = { -if (locs == null) Nil else locs.toSet.toSeq +if (locs == null) Nil else locs.distinct } override def runTask(context: TaskContext): MapStatus = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7e2fbb4..f0f84fe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -487,7 +487,7 @@ private[spark] class TaskSchedulerImpl( newExecAvail = true } } -val hosts = offers.map(_.host).toSet.toSeq +val hosts = offers.map(_.host).distinct for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9ee84a8..b9a11e7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -761,7 +761,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // that are explicitly blacklisted, plus those that have *any*
[spark] branch master updated (d025ddba -> 0b237bd)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d025ddba [SPARK-31238][SPARK-31284][TEST][FOLLOWUP] Fix readResourceOrcFile to create a local file from resource add 0b237bd [SPARK-31292][CORE][SQL] Replace toSet.toSeq with distinct for readability No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org