[spark] branch master updated: [SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning in binary files

2019-06-28 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new facf9c3  [SPARK-28204][SQL][TESTS] Make separate two test cases for 
column pruning in binary files
facf9c3 is described below

commit facf9c30a283ec682b5adb2e7afdbf5d011e3808
Author: HyukjinKwon 
AuthorDate: Sat Jun 29 14:05:23 2019 +0900

[SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning 
in binary files

## What changes were proposed in this pull request?

SPARK-27534 missed to address my own comments at 
https://github.com/WeichenXu123/spark/pull/8
It's better to push this in since the codes are already cleaned up.

## How was this patch tested?

Unittests fixed

Closes #25003 from HyukjinKwon/SPARK-27534.

Authored-by: HyukjinKwon 
Signed-off-by: HyukjinKwon 
---
 .../binaryfile/BinaryFileFormatSuite.scala | 88 +++---
 1 file changed, 43 insertions(+), 45 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 9e2969b..a66b34f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -290,56 +290,54 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSQLContext with SQLTest
 ), true)
   }
 
+  private def readBinaryFile(file: File, requiredSchema: StructType): Row = {
+val format = new BinaryFileFormat
+val reader = format.buildReaderWithPartitionValues(
+  sparkSession = spark,
+  dataSchema = schema,
+  partitionSchema = StructType(Nil),
+  requiredSchema = requiredSchema,
+  filters = Seq.empty,
+  options = Map.empty,
+  hadoopConf = spark.sessionState.newHadoopConf()
+)
+val partitionedFile = mock(classOf[PartitionedFile])
+when(partitionedFile.filePath).thenReturn(file.getPath)
+val encoder = RowEncoder(requiredSchema).resolveAndBind()
+encoder.fromRow(reader(partitionedFile).next())
+  }
+
   test("column pruning") {
-def getRequiredSchema(fieldNames: String*): StructType = {
-  StructType(fieldNames.map {
-case f if schema.fieldNames.contains(f) => schema(f)
-case other => StructField(other, NullType)
-  })
-}
-def read(file: File, requiredSchema: StructType): Row = {
-  val format = new BinaryFileFormat
-  val reader = format.buildReaderWithPartitionValues(
-sparkSession = spark,
-dataSchema = schema,
-partitionSchema = StructType(Nil),
-requiredSchema = requiredSchema,
-filters = Seq.empty,
-options = Map.empty,
-hadoopConf = spark.sessionState.newHadoopConf()
-  )
-  val partitionedFile = mock(classOf[PartitionedFile])
-  when(partitionedFile.filePath).thenReturn(file.getPath)
-  val encoder = RowEncoder(requiredSchema).resolveAndBind()
-  encoder.fromRow(reader(partitionedFile).next())
-}
-val file = new File(Utils.createTempDir(), "data")
-val content = "123".getBytes
-Files.write(file.toPath, content, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
-
-read(file, getRequiredSchema(MODIFICATION_TIME, CONTENT, LENGTH, PATH)) 
match {
-  case Row(t, c, len, p) =>
-assert(t === new Timestamp(file.lastModified()))
-assert(c === content)
-assert(len === content.length)
-assert(p.asInstanceOf[String].endsWith(file.getAbsolutePath))
+withTempPath { file =>
+  val content = "123".getBytes
+  Files.write(file.toPath, content, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
+
+  val actual = readBinaryFile(file, StructType(schema.takeRight(3)))
+  val expected = Row(new Timestamp(file.lastModified()), content.length, 
content)
+
+  assert(actual === expected)
 }
-file.setReadable(false)
-withClue("cannot read content") {
+  }
+
+  test("column pruning - non-readable file") {
+withTempPath { file =>
+  val content = "abc".getBytes
+  Files.write(file.toPath, content, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
+  file.setReadable(false)
+
+  // If content is selected, it throws an exception because it's not 
readable.
   intercept[IOException] {
-read(file, getRequiredSchema(CONTENT))
+readBinaryFile(file, StructType(schema(CONTENT) :: Nil))
   }
-}
-assert(read(file, getRequiredSchema(LENGTH)) === Row(content.length),
-  "Get length should not read content.")
-intercept[RuntimeException] {
-  read(file, 

[spark] branch master updated: [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas UDF

2019-06-28 Thread lixiao
This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8299600  [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for 
SCALAR_ITER Pandas UDF
8299600 is described below

commit 8299600575024ca81127b7bf8ef48ae11fdd0594
Author: Xiangrui Meng 
AuthorDate: Fri Jun 28 15:09:57 2019 -0700

[SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas 
UDF

## What changes were proposed in this pull request?

Add docstring/doctest for `SCALAR_ITER` Pandas UDF. I explicitly mentioned 
that per-partition execution is an implementation detail, not guaranteed. I 
will submit another PR to add the same to user guide, just to keep this PR 
minimal.

I didn't add "doctest: +SKIP" in the first commit so it is easy to test 
locally.

cc: HyukjinKwon gatorsmile icexelloss BryanCutler WeichenXu123

![Screen Shot 2019-06-28 at 9 52 41 
AM](https://user-images.githubusercontent.com/829644/60358349-b0aa5400-998a-11e9-9ebf-8481dfd555b5.png)
![Screen Shot 2019-06-28 at 9 53 19 
AM](https://user-images.githubusercontent.com/829644/60358355-b1db8100-998a-11e9-8f6f-00a11bdbdc4d.png)

## How was this patch tested?

doctest

Closes #25005 from mengxr/SPARK-28056.2.

Authored-by: Xiangrui Meng 
Signed-off-by: gatorsmile 
---
 python/pyspark/sql/functions.py | 104 +++-
 1 file changed, 102 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 34f6593..5d1e69e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2951,7 +2951,107 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
Therefore, this can be used, for example, to ensure the length of 
each returned
`pandas.Series`, and can not be used as the column length.
 
-2. GROUPED_MAP
+2. SCALAR_ITER
+
+   A scalar iterator UDF is semantically the same as the scalar Pandas UDF 
above except that the
+   wrapped Python function takes an iterator of batches as input instead 
of a single batch and,
+   instead of returning a single output batch, it yields output batches or 
explicitly returns an
+   generator or an iterator of output batches.
+   It is useful when the UDF execution requires initializing some state, 
e.g., loading a machine
+   learning model file to apply inference to every input batch.
+
+   .. note:: It is not guaranteed that one invocation of a scalar iterator 
UDF will process all
+   batches from one partition, although it is currently implemented 
this way.
+   Your code shall not rely on this behavior because it might change 
in the future for
+   further optimization, e.g., one invocation processes multiple 
partitions.
+
+   Scalar iterator UDFs are used with 
:meth:`pyspark.sql.DataFrame.withColumn` and
+   :meth:`pyspark.sql.DataFrame.select`.
+
+   >>> import pandas as pd  # doctest: +SKIP
+   >>> from pyspark.sql.functions import col, pandas_udf, struct, 
PandasUDFType
+   >>> pdf = pd.DataFrame([1, 2, 3], columns=["x"])  # doctest: +SKIP
+   >>> df = spark.createDataFrame(pdf)  # doctest: +SKIP
+
+   When the UDF is called with a single column that is not `StructType`, 
the input to the
+   underlying function is an iterator of `pd.Series`.
+
+   >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
+   ... def plus_one(batch_iter):
+   ... for x in batch_iter:
+   ... yield x + 1
+   ...
+   >>> df.select(plus_one(col("x"))).show()  # doctest: +SKIP
+   +---+
+   |plus_one(x)|
+   +---+
+   |  2|
+   |  3|
+   |  4|
+   +---+
+
+   When the UDF is called with more than one columns, the input to the 
underlying function is an
+   iterator of `pd.Series` tuple.
+
+   >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
+   ... def multiply_two_cols(batch_iter):
+   ... for a, b in batch_iter:
+   ... yield a * b
+   ...
+   >>> df.select(multiply_two_cols(col("x"), col("x"))).show()  # doctest: 
+SKIP
+   +---+
+   |multiply_two_cols(x, x)|
+   +---+
+   |  1|
+   |  4|
+   |  9|
+   +---+
+
+   When the UDF is called with a single column that is `StructType`, the 
input to the underlying
+   function is an iterator of `pd.DataFrame`.
+
+   >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
+   ... def multiply_two_nested_cols(pdf_iter):
+   ...

[spark] branch master updated: [SPARK-27945][SQL] Minimal changes to support columnar processing

2019-06-28 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c341de8  [SPARK-27945][SQL] Minimal changes to support columnar 
processing
c341de8 is described below

commit c341de8b3e1f1d3327bd4ae3b0d2ec048f64d306
Author: Robert (Bobby) Evans 
AuthorDate: Fri Jun 28 14:00:12 2019 -0500

[SPARK-27945][SQL] Minimal changes to support columnar processing

## What changes were proposed in this pull request?

This is the first part of 
[SPARK-27396](https://issues.apache.org/jira/browse/SPARK-27396).  This is the 
minimum set of changes necessary to support a pluggable back end for columnar 
processing.  Follow on JIRAs would cover removing some of the duplication 
between functionality in this patch and functionality currently covered by 
things like ColumnarBatchScan.

## How was this patch tested?

I added in a new unit test to cover new code not really covered in other 
places.

I also did manual testing by implementing two plugins/extensions that take 
advantage of the new APIs to allow for columnar processing for some simple 
queries.  One version runs on the 
[CPU](https://gist.github.com/revans2/c3cad77075c4fa5d9d271308ee2f1b1d).  The 
other version run on a GPU, but because it has unreleased dependencies I will 
not include a link to it yet.

The CPU version I would expect to add in as an example with other 
documentation in a follow on JIRA

This is contributed on behalf of NVIDIA Corporation.

Closes #24795 from revans2/columnar-basic.

Authored-by: Robert (Bobby) Evans 
Signed-off-by: Thomas Graves 
---
 .../apache/spark/sql/vectorized/ColumnVector.java  |   2 +-
 .../apache/spark/sql/vectorized/ColumnarBatch.java |  13 +-
 .../execution/vectorized/WritableColumnVector.java |   5 +-
 .../apache/spark/sql/SparkSessionExtensions.scala  |  19 +
 .../org/apache/spark/sql/execution/Columnar.scala  | 534 +
 .../spark/sql/execution/ColumnarBatchScan.scala|   2 -
 .../spark/sql/execution/QueryExecution.scala   |   2 +
 .../org/apache/spark/sql/execution/SparkPlan.scala |  36 ++
 .../sql/execution/WholeStageCodegenExec.scala  |  97 +++-
 .../sql/internal/BaseSessionStateBuilder.scala |   9 +-
 .../apache/spark/sql/internal/SessionState.scala   |   3 +-
 .../spark/sql/SparkSessionExtensionSuite.scala | 409 +++-
 .../python/BatchEvalPythonExecSuite.scala  |   8 +-
 .../execution/vectorized/ColumnarBatchSuite.scala  | 210 +++-
 14 files changed, 1311 insertions(+), 38 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 14caaea..f18d003 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -287,7 +287,7 @@ public abstract class ColumnVector implements AutoCloseable 
{
   /**
* @return child [[ColumnVector]] at the given ordinal.
*/
-  protected abstract ColumnVector getChild(int ordinal);
+  public abstract ColumnVector getChild(int ordinal);
 
   /**
* Data type for this column.
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
index 9f917ea..a2feac8 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java
@@ -31,7 +31,7 @@ import org.apache.spark.unsafe.types.UTF8String;
  * the entire data loading process.
  */
 @Evolving
-public final class ColumnarBatch {
+public final class ColumnarBatch implements AutoCloseable {
   private int numRows;
   private final ColumnVector[] columns;
 
@@ -42,6 +42,7 @@ public final class ColumnarBatch {
* Called to close all the columns in this batch. It is not valid to access 
the data after
* calling this. This must be called at the end to clean up memory 
allocations.
*/
+  @Override
   public void close() {
 for (ColumnVector c: columns) {
   c.close();
@@ -110,7 +111,17 @@ public final class ColumnarBatch {
   }
 
   public ColumnarBatch(ColumnVector[] columns) {
+this(columns, 0);
+  }
+
+  /**
+   * Create a new batch from existing column vectors.
+   * @param columns The columns of this batch
+   * @param numRows The number of rows in this batch
+   */
+  public ColumnarBatch(ColumnVector[] columns, int numRows) {
 this.columns = columns;
+this.numRows = numRows;
 this.row = new ColumnarBatchRow(columns);
   }
 }
diff --git 

[spark] branch master updated: [SPARK-28145][K8S] safe runnable in polling executor source

2019-06-28 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e7c97a3  [SPARK-28145][K8S] safe runnable in polling executor source
e7c97a3 is described below

commit e7c97a3d8606de7702b7590154b709dc848c308b
Author: Onur Satici 
AuthorDate: Fri Jun 28 09:38:43 2019 -0500

[SPARK-28145][K8S] safe runnable in polling executor source

## What changes were proposed in this pull request?

Add error handling to `ExecutorPodsPollingSnapshotSource`

Closes #24952 from onursatici/os/polling-source.

Authored-by: Onur Satici 
Signed-off-by: Sean Owen 
---
 .../scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
index e77e604..96a5059 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 private[spark] class ExecutorPodsPollingSnapshotSource(
 conf: SparkConf,
@@ -53,7 +53,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
   }
 
   private class PollRunnable(applicationId: String) extends Runnable {
-override def run(): Unit = {
+override def run(): Unit = Utils.tryLogNonFatalError {
   logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
   snapshotsStore.replaceSnapshot(kubernetesClient
 .pods()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (31e7c37 -> 832ff87)

2019-06-28 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 31e7c37  [SPARK-28185][PYTHON][SQL] Closes the generator when Python 
UDFs stop early
 add 832ff87  [SPARK-28077][SQL] Support ANSI SQL OVERLAY function.

No new revisions were added by this update.

Summary of changes:
 docs/sql-keywords.md   |   2 +
 .../apache/spark/sql/catalyst/parser/SqlBase.g4|   7 ++
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../sql/catalyst/expressions/Expression.scala  | 106 +
 .../catalyst/expressions/stringExpressions.scala   |  64 +
 .../spark/sql/catalyst/parser/AstBuilder.scala |  14 +++
 .../expressions/StringExpressionsSuite.scala   |  24 +
 .../sql/catalyst/parser/PlanParserSuite.scala  |  29 ++
 .../scala/org/apache/spark/sql/functions.scala |  22 +
 .../apache/spark/sql/StringFunctionsSuite.scala|  12 +++
 10 files changed, 281 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early

2019-06-28 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 31e7c37  [SPARK-28185][PYTHON][SQL] Closes the generator when Python 
UDFs stop early
31e7c37 is described below

commit 31e7c37354132545da59bff176af1613bd09447c
Author: WeichenXu 
AuthorDate: Fri Jun 28 17:10:25 2019 +0900

[SPARK-28185][PYTHON][SQL] Closes the generator when Python UDFs stop early

## What changes were proposed in this pull request?

 Closes the generator when Python UDFs stop early.

### Manually verification on pandas iterator UDF and mapPartitions

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import col, udf
from pyspark.taskcontext import TaskContext
import time
import os

spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1')
spark.conf.set('spark.sql.pandas.udf.buffer.size', '4')

pandas_udf("int", PandasUDFType.SCALAR_ITER)
def fi1(it):
try:
for batch in it:
yield batch + 100
time.sleep(1.0)
except BaseException as be:
print("Debug: exception raised: " + str(type(be)))
raise be
finally:
open("/tmp/01.tmp", "a").close()

df1 = spark.range(10).select(col('id').alias('a')).repartition(1)

# will see log Debug: exception raised: 
# and file "/tmp/01.tmp" generated.
df1.select(col('a'), fi1('a')).limit(2).collect()

def mapper(it):
try:
for batch in it:
yield batch
except BaseException as be:
print("Debug: exception raised: " + str(type(be)))
raise be
finally:
open("/tmp/02.tmp", "a").close()

df2 = spark.range(1000).repartition(1)

# will see log Debug: exception raised: 
# and file "/tmp/02.tmp" generated.
df2.rdd.mapPartitions(mapper).take(2)

```

## How was this patch tested?

Unit test added.

Please review https://spark.apache.org/contributing.html before opening a 
pull request.

Closes #24986 from WeichenXu123/pandas_iter_udf_limit.

Authored-by: WeichenXu 
Signed-off-by: HyukjinKwon 
---
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 37 ++
 python/pyspark/worker.py   |  7 +++-
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index c291d42..d254508 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -850,6 +850,43 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 with self.assertRaisesRegexp(Exception, "reached finally block"):
 self.spark.range(1).select(test_close(col("id"))).collect()
 
+def test_scalar_iter_udf_close_early(self):
+tmp_dir = tempfile.mkdtemp()
+try:
+tmp_file = tmp_dir + '/reach_finally_block'
+
+@pandas_udf('int', PandasUDFType.SCALAR_ITER)
+def test_close(batch_iter):
+generator_exit_caught = False
+try:
+for batch in batch_iter:
+yield batch
+time.sleep(1.0)  # avoid the function finish too fast.
+except GeneratorExit as ge:
+generator_exit_caught = True
+raise ge
+finally:
+assert generator_exit_caught, "Generator exit exception 
was not caught."
+open(tmp_file, 'a').close()
+
+with QuietTest(self.sc):
+with 
self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 1,
+"spark.sql.pandas.udf.buffer.size": 4}):
+self.spark.range(10).repartition(1) \
+.select(test_close(col("id"))).limit(2).collect()
+# wait here because python udf worker will take some time 
to detect
+# jvm side socket closed and then will trigger 
`GenerateExit` raised.
+# wait timeout is 10s.
+for i in range(100):
+time.sleep(0.1)
+if os.path.exists(tmp_file):
+break
+
+assert os.path.exists(tmp_file), "finally block not 
reached."
+
+finally:
+shutil.rmtree(tmp_dir)
+
 # Regression test for SPARK-23314
 def test_timestamp_dst(self):
 # Daylight saving time for Los Angeles for 2015 is