[spark] branch master updated: [SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning in binary files
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new facf9c3 [SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning in binary files facf9c3 is described below commit facf9c30a283ec682b5adb2e7afdbf5d011e3808 Author: HyukjinKwon AuthorDate: Sat Jun 29 14:05:23 2019 +0900 [SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning in binary files ## What changes were proposed in this pull request? SPARK-27534 missed to address my own comments at https://github.com/WeichenXu123/spark/pull/8 It's better to push this in since the codes are already cleaned up. ## How was this patch tested? Unittests fixed Closes #25003 from HyukjinKwon/SPARK-27534. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- .../binaryfile/BinaryFileFormatSuite.scala | 88 +++--- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index 9e2969b..a66b34f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -290,56 +290,54 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest ), true) } + private def readBinaryFile(file: File, requiredSchema: StructType): Row = { +val format = new BinaryFileFormat +val reader = format.buildReaderWithPartitionValues( + sparkSession = spark, + dataSchema = schema, + partitionSchema = StructType(Nil), + requiredSchema = requiredSchema, + filters = Seq.empty, + options = Map.empty, + hadoopConf = spark.sessionState.newHadoopConf() +) +val partitionedFile = mock(classOf[PartitionedFile]) +when(partitionedFile.filePath).thenReturn(file.getPath) +val encoder = RowEncoder(requiredSchema).resolveAndBind() +encoder.fromRow(reader(partitionedFile).next()) + } + test("column pruning") { -def getRequiredSchema(fieldNames: String*): StructType = { - StructType(fieldNames.map { -case f if schema.fieldNames.contains(f) => schema(f) -case other => StructField(other, NullType) - }) -} -def read(file: File, requiredSchema: StructType): Row = { - val format = new BinaryFileFormat - val reader = format.buildReaderWithPartitionValues( -sparkSession = spark, -dataSchema = schema, -partitionSchema = StructType(Nil), -requiredSchema = requiredSchema, -filters = Seq.empty, -options = Map.empty, -hadoopConf = spark.sessionState.newHadoopConf() - ) - val partitionedFile = mock(classOf[PartitionedFile]) - when(partitionedFile.filePath).thenReturn(file.getPath) - val encoder = RowEncoder(requiredSchema).resolveAndBind() - encoder.fromRow(reader(partitionedFile).next()) -} -val file = new File(Utils.createTempDir(), "data") -val content = "123".getBytes -Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE) - -read(file, getRequiredSchema(MODIFICATION_TIME, CONTENT, LENGTH, PATH)) match { - case Row(t, c, len, p) => -assert(t === new Timestamp(file.lastModified())) -assert(c === content) -assert(len === content.length) -assert(p.asInstanceOf[String].endsWith(file.getAbsolutePath)) +withTempPath { file => + val content = "123".getBytes + Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE) + + val actual = readBinaryFile(file, StructType(schema.takeRight(3))) + val expected = Row(new Timestamp(file.lastModified()), content.length, content) + + assert(actual === expected) } -file.setReadable(false) -withClue("cannot read content") { + } + + test("column pruning - non-readable file") { +withTempPath { file => + val content = "abc".getBytes + Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE) + file.setReadable(false) + + // If content is selected, it throws an exception because it's not readable. intercept[IOException] { -read(file, getRequiredSchema(CONTENT)) +readBinaryFile(file, StructType(schema(CONTENT) :: Nil)) } -} -assert(read(file, getRequiredSchema(LENGTH)) === Row(content.length), - "Get length should not read content.") -intercept[RuntimeException] { - read(file,
[spark] branch master updated: [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas UDF
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8299600 [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas UDF 8299600 is described below commit 8299600575024ca81127b7bf8ef48ae11fdd0594 Author: Xiangrui Meng AuthorDate: Fri Jun 28 15:09:57 2019 -0700 [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas UDF ## What changes were proposed in this pull request? Add docstring/doctest for `SCALAR_ITER` Pandas UDF. I explicitly mentioned that per-partition execution is an implementation detail, not guaranteed. I will submit another PR to add the same to user guide, just to keep this PR minimal. I didn't add "doctest: +SKIP" in the first commit so it is easy to test locally. cc: HyukjinKwon gatorsmile icexelloss BryanCutler WeichenXu123 ![Screen Shot 2019-06-28 at 9 52 41 AM](https://user-images.githubusercontent.com/829644/60358349-b0aa5400-998a-11e9-9ebf-8481dfd555b5.png) ![Screen Shot 2019-06-28 at 9 53 19 AM](https://user-images.githubusercontent.com/829644/60358355-b1db8100-998a-11e9-8f6f-00a11bdbdc4d.png) ## How was this patch tested? doctest Closes #25005 from mengxr/SPARK-28056.2. Authored-by: Xiangrui Meng Signed-off-by: gatorsmile --- python/pyspark/sql/functions.py | 104 +++- 1 file changed, 102 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 34f6593..5d1e69e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2951,7 +2951,107 @@ def pandas_udf(f=None, returnType=None, functionType=None): Therefore, this can be used, for example, to ensure the length of each returned `pandas.Series`, and can not be used as the column length. -2. GROUPED_MAP +2. SCALAR_ITER + + A scalar iterator UDF is semantically the same as the scalar Pandas UDF above except that the + wrapped Python function takes an iterator of batches as input instead of a single batch and, + instead of returning a single output batch, it yields output batches or explicitly returns an + generator or an iterator of output batches. + It is useful when the UDF execution requires initializing some state, e.g., loading a machine + learning model file to apply inference to every input batch. + + .. note:: It is not guaranteed that one invocation of a scalar iterator UDF will process all + batches from one partition, although it is currently implemented this way. + Your code shall not rely on this behavior because it might change in the future for + further optimization, e.g., one invocation processes multiple partitions. + + Scalar iterator UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and + :meth:`pyspark.sql.DataFrame.select`. + + >>> import pandas as pd # doctest: +SKIP + >>> from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType + >>> pdf = pd.DataFrame([1, 2, 3], columns=["x"]) # doctest: +SKIP + >>> df = spark.createDataFrame(pdf) # doctest: +SKIP + + When the UDF is called with a single column that is not `StructType`, the input to the + underlying function is an iterator of `pd.Series`. + + >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP + ... def plus_one(batch_iter): + ... for x in batch_iter: + ... yield x + 1 + ... + >>> df.select(plus_one(col("x"))).show() # doctest: +SKIP + +---+ + |plus_one(x)| + +---+ + | 2| + | 3| + | 4| + +---+ + + When the UDF is called with more than one columns, the input to the underlying function is an + iterator of `pd.Series` tuple. + + >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP + ... def multiply_two_cols(batch_iter): + ... for a, b in batch_iter: + ... yield a * b + ... + >>> df.select(multiply_two_cols(col("x"), col("x"))).show() # doctest: +SKIP + +---+ + |multiply_two_cols(x, x)| + +---+ + | 1| + | 4| + | 9| + +---+ + + When the UDF is called with a single column that is `StructType`, the input to the underlying + function is an iterator of `pd.DataFrame`. + + >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP + ... def multiply_two_nested_cols(pdf_iter): + ...
[spark] branch master updated: [SPARK-27945][SQL] Minimal changes to support columnar processing
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c341de8 [SPARK-27945][SQL] Minimal changes to support columnar processing c341de8 is described below commit c341de8b3e1f1d3327bd4ae3b0d2ec048f64d306 Author: Robert (Bobby) Evans AuthorDate: Fri Jun 28 14:00:12 2019 -0500 [SPARK-27945][SQL] Minimal changes to support columnar processing ## What changes were proposed in this pull request? This is the first part of [SPARK-27396](https://issues.apache.org/jira/browse/SPARK-27396). This is the minimum set of changes necessary to support a pluggable back end for columnar processing. Follow on JIRAs would cover removing some of the duplication between functionality in this patch and functionality currently covered by things like ColumnarBatchScan. ## How was this patch tested? I added in a new unit test to cover new code not really covered in other places. I also did manual testing by implementing two plugins/extensions that take advantage of the new APIs to allow for columnar processing for some simple queries. One version runs on the [CPU](https://gist.github.com/revans2/c3cad77075c4fa5d9d271308ee2f1b1d). The other version run on a GPU, but because it has unreleased dependencies I will not include a link to it yet. The CPU version I would expect to add in as an example with other documentation in a follow on JIRA This is contributed on behalf of NVIDIA Corporation. Closes #24795 from revans2/columnar-basic. Authored-by: Robert (Bobby) Evans Signed-off-by: Thomas Graves --- .../apache/spark/sql/vectorized/ColumnVector.java | 2 +- .../apache/spark/sql/vectorized/ColumnarBatch.java | 13 +- .../execution/vectorized/WritableColumnVector.java | 5 +- .../apache/spark/sql/SparkSessionExtensions.scala | 19 + .../org/apache/spark/sql/execution/Columnar.scala | 534 + .../spark/sql/execution/ColumnarBatchScan.scala| 2 - .../spark/sql/execution/QueryExecution.scala | 2 + .../org/apache/spark/sql/execution/SparkPlan.scala | 36 ++ .../sql/execution/WholeStageCodegenExec.scala | 97 +++- .../sql/internal/BaseSessionStateBuilder.scala | 9 +- .../apache/spark/sql/internal/SessionState.scala | 3 +- .../spark/sql/SparkSessionExtensionSuite.scala | 409 +++- .../python/BatchEvalPythonExecSuite.scala | 8 +- .../execution/vectorized/ColumnarBatchSuite.scala | 210 +++- 14 files changed, 1311 insertions(+), 38 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index 14caaea..f18d003 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -287,7 +287,7 @@ public abstract class ColumnVector implements AutoCloseable { /** * @return child [[ColumnVector]] at the given ordinal. */ - protected abstract ColumnVector getChild(int ordinal); + public abstract ColumnVector getChild(int ordinal); /** * Data type for this column. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 9f917ea..a2feac8 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -31,7 +31,7 @@ import org.apache.spark.unsafe.types.UTF8String; * the entire data loading process. */ @Evolving -public final class ColumnarBatch { +public final class ColumnarBatch implements AutoCloseable { private int numRows; private final ColumnVector[] columns; @@ -42,6 +42,7 @@ public final class ColumnarBatch { * Called to close all the columns in this batch. It is not valid to access the data after * calling this. This must be called at the end to clean up memory allocations. */ + @Override public void close() { for (ColumnVector c: columns) { c.close(); @@ -110,7 +111,17 @@ public final class ColumnarBatch { } public ColumnarBatch(ColumnVector[] columns) { +this(columns, 0); + } + + /** + * Create a new batch from existing column vectors. + * @param columns The columns of this batch + * @param numRows The number of rows in this batch + */ + public ColumnarBatch(ColumnVector[] columns, int numRows) { this.columns = columns; +this.numRows = numRows; this.row = new ColumnarBatchRow(columns); } } diff --git
[spark] branch master updated: [SPARK-28145][K8S] safe runnable in polling executor source
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e7c97a3 [SPARK-28145][K8S] safe runnable in polling executor source e7c97a3 is described below commit e7c97a3d8606de7702b7590154b709dc848c308b Author: Onur Satici AuthorDate: Fri Jun 28 09:38:43 2019 -0500 [SPARK-28145][K8S] safe runnable in polling executor source ## What changes were proposed in this pull request? Add error handling to `ExecutorPodsPollingSnapshotSource` Closes #24952 from onursatici/os/polling-source. Authored-by: Onur Satici Signed-off-by: Sean Owen --- .../scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index e77e604..96a5059 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -25,7 +25,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class ExecutorPodsPollingSnapshotSource( conf: SparkConf, @@ -53,7 +53,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( } private class PollRunnable(applicationId: String) extends Runnable { -override def run(): Unit = { +override def run(): Unit = Utils.tryLogNonFatalError { logDebug(s"Resynchronizing full executor pod state from Kubernetes.") snapshotsStore.replaceSnapshot(kubernetesClient .pods() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (31e7c37 -> 832ff87)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 31e7c37 [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early add 832ff87 [SPARK-28077][SQL] Support ANSI SQL OVERLAY function. No new revisions were added by this update. Summary of changes: docs/sql-keywords.md | 2 + .../apache/spark/sql/catalyst/parser/SqlBase.g4| 7 ++ .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../sql/catalyst/expressions/Expression.scala | 106 + .../catalyst/expressions/stringExpressions.scala | 64 + .../spark/sql/catalyst/parser/AstBuilder.scala | 14 +++ .../expressions/StringExpressionsSuite.scala | 24 + .../sql/catalyst/parser/PlanParserSuite.scala | 29 ++ .../scala/org/apache/spark/sql/functions.scala | 22 + .../apache/spark/sql/StringFunctionsSuite.scala| 12 +++ 10 files changed, 281 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 31e7c37 [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early 31e7c37 is described below commit 31e7c37354132545da59bff176af1613bd09447c Author: WeichenXu AuthorDate: Fri Jun 28 17:10:25 2019 +0900 [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early ## What changes were proposed in this pull request? Closes the generator when Python UDFs stop early. ### Manually verification on pandas iterator UDF and mapPartitions ```python from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.functions import col, udf from pyspark.taskcontext import TaskContext import time import os spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1') spark.conf.set('spark.sql.pandas.udf.buffer.size', '4') pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi1(it): try: for batch in it: yield batch + 100 time.sleep(1.0) except BaseException as be: print("Debug: exception raised: " + str(type(be))) raise be finally: open("/tmp/01.tmp", "a").close() df1 = spark.range(10).select(col('id').alias('a')).repartition(1) # will see log Debug: exception raised: # and file "/tmp/01.tmp" generated. df1.select(col('a'), fi1('a')).limit(2).collect() def mapper(it): try: for batch in it: yield batch except BaseException as be: print("Debug: exception raised: " + str(type(be))) raise be finally: open("/tmp/02.tmp", "a").close() df2 = spark.range(1000).repartition(1) # will see log Debug: exception raised: # and file "/tmp/02.tmp" generated. df2.rdd.mapPartitions(mapper).take(2) ``` ## How was this patch tested? Unit test added. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #24986 from WeichenXu123/pandas_iter_udf_limit. Authored-by: WeichenXu Signed-off-by: HyukjinKwon --- python/pyspark/sql/tests/test_pandas_udf_scalar.py | 37 ++ python/pyspark/worker.py | 7 +++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index c291d42..d254508 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -850,6 +850,43 @@ class ScalarPandasUDFTests(ReusedSQLTestCase): with self.assertRaisesRegexp(Exception, "reached finally block"): self.spark.range(1).select(test_close(col("id"))).collect() +def test_scalar_iter_udf_close_early(self): +tmp_dir = tempfile.mkdtemp() +try: +tmp_file = tmp_dir + '/reach_finally_block' + +@pandas_udf('int', PandasUDFType.SCALAR_ITER) +def test_close(batch_iter): +generator_exit_caught = False +try: +for batch in batch_iter: +yield batch +time.sleep(1.0) # avoid the function finish too fast. +except GeneratorExit as ge: +generator_exit_caught = True +raise ge +finally: +assert generator_exit_caught, "Generator exit exception was not caught." +open(tmp_file, 'a').close() + +with QuietTest(self.sc): +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 1, +"spark.sql.pandas.udf.buffer.size": 4}): +self.spark.range(10).repartition(1) \ +.select(test_close(col("id"))).limit(2).collect() +# wait here because python udf worker will take some time to detect +# jvm side socket closed and then will trigger `GenerateExit` raised. +# wait timeout is 10s. +for i in range(100): +time.sleep(0.1) +if os.path.exists(tmp_file): +break + +assert os.path.exists(tmp_file), "finally block not reached." + +finally: +shutil.rmtree(tmp_dir) + # Regression test for SPARK-23314 def test_timestamp_dst(self): # Daylight saving time for Los Angeles for 2015 is