GitHub user ArunkumarRamanan opened a pull request:

    https://github.com/apache/spark/pull/22242

    Branch 2.3

    ## What changes were proposed in this pull request?
    
    (Please fill in changes proposed in this fix)
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/spark branch-2.3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22242.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22242
    
----
commit a4eb1e47ad2453b41ebb431272c92e1ac48bb310
Author: hyukjinkwon <gurwls223@...>
Date:   2018-02-28T15:44:13Z

    [SPARK-23517][PYTHON] Make `pyspark.util._exception_message` produce the 
trace from Java side by Py4JJavaError
    
    ## What changes were proposed in this pull request?
    
    This PR proposes for `pyspark.util._exception_message` to produce the trace 
from Java side by `Py4JJavaError`.
    
    Currently, in Python 2, it uses `message` attribute which `Py4JJavaError` 
didn't happen to have:
    
    ```python
    >>> from pyspark.util import _exception_message
    >>> try:
    ...     sc._jvm.java.lang.String(None)
    ... except Exception as e:
    ...     pass
    ...
    >>> e.message
    ''
    ```
    
    Seems we should use `str` instead for now:
    
     
https://github.com/bartdag/py4j/blob/aa6c53b59027925a426eb09b58c453de02c21b7c/py4j-python/src/py4j/protocol.py#L412
    
    but this doesn't address the problem with non-ascii string from Java side -
     `https://github.com/bartdag/py4j/issues/306`
    
    So, we could directly call `__str__()`:
    
    ```python
    >>> e.__str__()
    u'An error occurred while calling None.java.lang.String.\n: 
java.lang.NullPointerException\n\tat 
java.lang.String.<init>(String.java:588)\n\tat 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)\n\tat 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)\n\tat
 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\n\tat
 java.lang.reflect.Constructor.newInstance(Constructor.java:422)\n\tat 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)\n\tat 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat 
py4j.Gateway.invoke(Gateway.java:238)\n\tat 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)\n\tat
 py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)\n\tat 
py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat 
java.lang.Thread.run(Thread.java:745)\n'
    ```
    
    which doesn't type coerce unicodes to `str` in Python 2.
    
    This can be actually a problem:
    
    ```python
    from pyspark.sql.functions import udf
    spark.conf.set("spark.sql.execution.arrow.enabled", True)
    spark.range(1).select(udf(lambda x: [[]])()).toPandas()
    ```
    
    **Before**
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/dataframe.py", line 2009, in toPandas
        raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    RuntimeError:
    Note: toPandas attempted Arrow optimization because 
'spark.sql.execution.arrow.enabled' is set to true. Please set it to false to 
disable this.
    ```
    
    **After**
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/dataframe.py", line 2009, in toPandas
        raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
    RuntimeError: An error occurred while calling o47.collectAsArrowToPython.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 
in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 
(TID 7, localhost, executor driver): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/.../spark/python/pyspark/worker.py", line 245, in main
        process()
      File "/.../spark/python/pyspark/worker.py", line 240, in process
    ...
    Note: toPandas attempted Arrow optimization because 
'spark.sql.execution.arrow.enabled' is set to true. Please set it to false to 
disable this.
    ```
    
    ## How was this patch tested?
    
    Manually tested and unit tests were added.
    
    Author: hyukjinkwon <gurwls...@gmail.com>
    
    Closes #20680 from HyukjinKwon/SPARK-23517.
    
    (cherry picked from commit fab563b9bd1581112462c0fc0b299ad6510b6564)
    Signed-off-by: hyukjinkwon <gurwls...@gmail.com>

commit 2aa66eb387d20725bbd7551d2d77609a77b1e699
Author: Dongjoon Hyun <dongjoon@...>
Date:   2018-03-02T01:26:39Z

    [SPARK-23551][BUILD] Exclude `hadoop-mapreduce-client-core` dependency from 
`orc-mapreduce`
    
    ## What changes were proposed in this pull request?
    
    This PR aims to prevent `orc-mapreduce` dependency from making IDEs and 
maven confused.
    
    **BEFORE**
    Please note that `2.6.4` at `Spark Project SQL`.
    ```
    $ mvn dependency:tree -Phadoop-2.7 
-Dincludes=org.apache.hadoop:hadoop-mapreduce-client-core
    ...
    [INFO] 
------------------------------------------------------------------------
    [INFO] Building Spark Project Catalyst 2.4.0-SNAPSHOT
    [INFO] 
------------------------------------------------------------------------
    [INFO]
    [INFO] --- maven-dependency-plugin:3.0.2:tree (default-cli)  
spark-catalyst_2.11 ---
    [INFO] org.apache.spark:spark-catalyst_2.11:jar:2.4.0-SNAPSHOT
    [INFO] \- org.apache.spark:spark-core_2.11:jar:2.4.0-SNAPSHOT:compile
    [INFO]    \- org.apache.hadoop:hadoop-client:jar:2.7.3:compile
    [INFO]       \- 
org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3:compile
    [INFO]
    [INFO] 
------------------------------------------------------------------------
    [INFO] Building Spark Project SQL 2.4.0-SNAPSHOT
    [INFO] 
------------------------------------------------------------------------
    [INFO]
    [INFO] --- maven-dependency-plugin:3.0.2:tree (default-cli)  spark-sql_2.11 
---
    [INFO] org.apache.spark:spark-sql_2.11:jar:2.4.0-SNAPSHOT
    [INFO] \- org.apache.orc:orc-mapreduce:jar:nohive:1.4.3:compile
    [INFO]    \- 
org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.6.4:compile
    ```
    
    **AFTER**
    ```
    $ mvn dependency:tree -Phadoop-2.7 
-Dincludes=org.apache.hadoop:hadoop-mapreduce-client-core
    ...
    [INFO] 
------------------------------------------------------------------------
    [INFO] Building Spark Project Catalyst 2.4.0-SNAPSHOT
    [INFO] 
------------------------------------------------------------------------
    [INFO]
    [INFO] --- maven-dependency-plugin:3.0.2:tree (default-cli)  
spark-catalyst_2.11 ---
    [INFO] org.apache.spark:spark-catalyst_2.11:jar:2.4.0-SNAPSHOT
    [INFO] \- org.apache.spark:spark-core_2.11:jar:2.4.0-SNAPSHOT:compile
    [INFO]    \- org.apache.hadoop:hadoop-client:jar:2.7.3:compile
    [INFO]       \- 
org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3:compile
    [INFO]
    [INFO] 
------------------------------------------------------------------------
    [INFO] Building Spark Project SQL 2.4.0-SNAPSHOT
    [INFO] 
------------------------------------------------------------------------
    [INFO]
    [INFO] --- maven-dependency-plugin:3.0.2:tree (default-cli)  spark-sql_2.11 
---
    [INFO] org.apache.spark:spark-sql_2.11:jar:2.4.0-SNAPSHOT
    [INFO] \- org.apache.spark:spark-core_2.11:jar:2.4.0-SNAPSHOT:compile
    [INFO]    \- org.apache.hadoop:hadoop-client:jar:2.7.3:compile
    [INFO]       \- 
org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3:compile
    ```
    
    ## How was this patch tested?
    
    1. Pass the Jenkins with `dev/test-dependencies.sh` with the existing 
dependencies.
    2. Manually do the following and see the change.
    ```
    mvn dependency:tree -Phadoop-2.7 
-Dincludes=org.apache.hadoop:hadoop-mapreduce-client-core
    ```
    
    Author: Dongjoon Hyun <dongj...@apache.org>
    
    Closes #20704 from dongjoon-hyun/SPARK-23551.
    
    (cherry picked from commit 34811e0b908449fd59bca476604612b1d200778d)
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>

commit 56cfbd932d3d038ce21cfa4939dfd9563c719003
Author: Joseph K. Bradley <joseph@...>
Date:   2018-03-02T05:04:01Z

    [SPARK-22883][ML][TEST] Streaming tests for spark.ml.feature, from A to H
    
    ## What changes were proposed in this pull request?
    
    Adds structured streaming tests using testTransformer for these suites:
    * BinarizerSuite
    * BucketedRandomProjectionLSHSuite
    * BucketizerSuite
    * ChiSqSelectorSuite
    * CountVectorizerSuite
    * DCTSuite.scala
    * ElementwiseProductSuite
    * FeatureHasherSuite
    * HashingTFSuite
    
    ## How was this patch tested?
    
    It tests itself because it is a bunch of tests!
    
    Author: Joseph K. Bradley <jos...@databricks.com>
    
    Closes #20111 from jkbradley/SPARK-22883-streaming-featureAM.
    
    (cherry picked from commit 119f6a0e4729aa952e811d2047790a32ee90bf69)
    Signed-off-by: Joseph K. Bradley <jos...@databricks.com>

commit 8fe20e15196b4ddbd80828ad3a91cf06c5dbea84
Author: Felix Cheung <felixcheung_m@...>
Date:   2018-03-02T17:23:39Z

    [SPARKR][DOC] fix link in vignettes
    
    ## What changes were proposed in this pull request?
    
    Fix doc link that was changed in 2.3
    
    shivaram
    
    Author: Felix Cheung <felixcheun...@hotmail.com>
    
    Closes #20711 from felixcheung/rvigmean.
    
    (cherry picked from commit 0b6ceadeb563205cbd6bd03bc88e608086273b5b)
    Signed-off-by: Felix Cheung <felixche...@apache.org>

commit f12fa13f16daf0a3f194d78a7e028c8aa7522676
Author: gatorsmile <gatorsmile@...>
Date:   2018-03-02T22:30:37Z

    [SPARK-23570][SQL] Add Spark 2.3.0 in HiveExternalCatalogVersionsSuite
    
    ## What changes were proposed in this pull request?
    Add Spark 2.3.0 in HiveExternalCatalogVersionsSuite since Spark 2.3.0 is 
released for ensuring backward compatibility.
    
    ## How was this patch tested?
    N/A
    
    Author: gatorsmile <gatorsm...@gmail.com>
    
    Closes #20720 from gatorsmile/add2.3.
    
    (cherry picked from commit 487377e693af65b2ff3d6b874ca7326c1ff0076c)
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>

commit 26a8a675aa938a38b0fce91418b0c7ed6fd65625
Author: Eric Liang <ekhliang@...>
Date:   2018-03-04T22:32:24Z

    [SQL][MINOR] XPathDouble prettyPrint should say 'double' not 'float'
    
    ## What changes were proposed in this pull request?
    
    It looks like this was incorrectly copied from `XPathFloat` in the class 
above.
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Author: Eric Liang <ekhli...@gmail.com>
    
    Closes #20730 from ericl/fix-typo-xpath.
    
    (cherry picked from commit a89cdf55fa76fa23a524f0443e323498c3cc8664)
    Signed-off-by: hyukjinkwon <gurwls...@gmail.com>

commit c8aa6fbb049795195e414597839fe61ae3f56d92
Author: Michael (Stu) Stewart <mstewart141@...>
Date:   2018-03-05T04:36:42Z

    [SPARK-23569][PYTHON] Allow pandas_udf to work with python3 style 
type-annotated functions
    
    ## What changes were proposed in this pull request?
    
    Check python version to determine whether to use `inspect.getargspec` or 
`inspect.getfullargspec` before applying `pandas_udf` core logic to a function. 
The former is python2.7 (deprecated in python3) and the latter is python3.x. 
The latter correctly accounts for type annotations, which are syntax errors in 
python2.x.
    
    ## How was this patch tested?
    
    Locally, on python 2.7 and 3.6.
    
    Author: Michael (Stu) Stewart <mstewart...@gmail.com>
    
    Closes #20728 from mstewart141/pandas_udf_fix.
    
    (cherry picked from commit 7965c91d8a67c213ca5eebda5e46e7c49a8ba121)
    Signed-off-by: hyukjinkwon <gurwls...@gmail.com>

commit 88dd335f6f36ce68862d33720959aaf62742f86d
Author: hyukjinkwon <gurwls223@...>
Date:   2018-03-05T05:22:30Z

    [MINOR][DOCS] Fix a link in "Compatibility with Apache Hive"
    
    ## What changes were proposed in this pull request?
    
    This PR fixes a broken link as below:
    
    **Before:**
    
    <img width="678" alt="2018-03-05 12 23 58" 
src="https://user-images.githubusercontent.com/6477701/36957930-6d00ebda-207b-11e8-9ae4-718561b0428c.png";>
    
    **After:**
    
    <img width="680" alt="2018-03-05 12 23 20" 
src="https://user-images.githubusercontent.com/6477701/36957934-6f834ac4-207b-11e8-97b4-18832b2b80cd.png";>
    
    Also see 
https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#compatibility-with-apache-hive
    
    ## How was this patch tested?
    
    Manually tested. I checked the same instances in `docs` directory. Seems 
this is the only one.
    
    Author: hyukjinkwon <gurwls...@gmail.com>
    
    Closes #20733 from HyukjinKwon/minor-link.
    
    (cherry picked from commit 269cd53590dd155aeb5269efc909a6e228f21e22)
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>

commit 232b9f81f02ec00fc698f610ecc1ca25740e8802
Author: Mihaly Toth <misutoth@...>
Date:   2018-03-05T14:46:40Z

    [SPARK-23329][SQL] Fix documentation of trigonometric functions
    
    ## What changes were proposed in this pull request?
    
    Provide more details in trigonometric function documentations. Referenced 
`java.lang.Math` for further details in the descriptions.
    ## How was this patch tested?
    
    Ran full build, checked generated documentation manually
    
    Author: Mihaly Toth <misut...@gmail.com>
    
    Closes #20618 from misutoth/trigonometric-doc.
    
    (cherry picked from commit a366b950b90650693ad0eb1e5b9a988ad028d845)
    Signed-off-by: hyukjinkwon <gurwls...@gmail.com>

commit 4550673b1a94e9023a0c6fdc6a92e4b860e1cfb2
Author: WeichenXu <weichen.xu@...>
Date:   2018-03-05T18:50:00Z

    [SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification
    
    ## What changes were proposed in this pull request?
    
    adding Structured Streaming tests for all Models/Transformers in 
spark.ml.classification
    
    ## How was this patch tested?
    
    N/A
    
    Author: WeichenXu <weichen...@databricks.com>
    
    Closes #20121 from WeichenXu123/ml_stream_test_classification.
    
    (cherry picked from commit 98a5c0a35f0a24730f5074522939acf57ef95422)
    Signed-off-by: Joseph K. Bradley <jos...@databricks.com>

commit 911b83da42fa850eb3ae419687c204cb2e25767b
Author: Dongjoon Hyun <dongjoon@...>
Date:   2018-03-05T19:53:23Z

    [SPARK-23457][SQL][BRANCH-2.3] Register task completion listeners first in 
ParquetFileFormat
    
    ## What changes were proposed in this pull request?
    
    ParquetFileFormat leaks opened files in some cases. This PR prevents that 
by registering task completion listers first before initialization.
    
    - 
[spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
    - 
[spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
    
    ```
    Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
        at 
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
        at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
        at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
        at 
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
        at
    ```
    
    ## How was this patch tested?
    
    Manual. The following test case generates the same leakage.
    
    ```scala
      test("SPARK-23457 Register task completion listeners first in 
ParquetFileFormat") {
        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
          withTempDir { dir =>
            val basePath = dir.getCanonicalPath
            Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, 
"first").toString)
            Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, 
"second").toString)
            val df = spark.read.parquet(
              new Path(basePath, "first").toString,
              new Path(basePath, "second").toString)
            val e = intercept[SparkException] {
              df.collect()
            }
            assert(e.getCause.isInstanceOf[OutOfMemoryError])
          }
        }
      }
    ```
    
    Author: Dongjoon Hyun <dongj...@apache.org>
    
    Closes #20714 from dongjoon-hyun/SPARK-23457-2.3.

commit b9ea2e87bb24c3731bd2dbd044d10d18dbbf9c6f
Author: Dongjoon Hyun <dongjoon@...>
Date:   2018-03-05T22:20:10Z

    [SPARK-23434][SQL][BRANCH-2.3] Spark should not warn `metadata directory` 
for a HDFS file path
    
    ## What changes were proposed in this pull request?
    
    In a kerberized cluster, when Spark reads a file path (e.g. `people.json`), 
it warns with a wrong warning message during looking up 
`people.json/_spark_metadata`. The root cause of this situation is the 
difference between `LocalFileSystem` and `DistributedFileSystem`. 
`LocalFileSystem.exists()` returns `false`, but `DistributedFileSystem.exists` 
raises `org.apache.hadoop.security.AccessControlException`.
    
    ```scala
    scala> 
spark.read.json("file:///usr/hdp/current/spark-client/examples/src/main/resources/people.json").show
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    
    scala> spark.read.json("hdfs:///tmp/people.json")
    18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for 
metadata directory.
    18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for 
metadata directory.
    ```
    
    After this PR,
    ```scala
    scala> spark.read.json("hdfs:///tmp/people.json").show
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+
    ```
    
    ## How was this patch tested?
    
    Manual.
    
    Author: Dongjoon Hyun <dongj...@apache.org>
    
    Closes #20713 from dongjoon-hyun/SPARK-23434-2.3.

commit 66c1978f9ca53bbddc92ae490a61c904952be7f1
Author: Sean Owen <sowen@...>
Date:   2018-03-06T14:52:28Z

    [SPARK-23601][BUILD] Remove .md5 files from release
    
    ## What changes were proposed in this pull request?
    
    Remove .md5 files from release artifacts
    
    ## How was this patch tested?
    
    N/A
    
    Author: Sean Owen <so...@cloudera.com>
    
    Closes #20737 from srowen/SPARK-23601.
    
    (cherry picked from commit 8bceb899dc3220998a4ea4021f3b477f78faaca8)
    Signed-off-by: Sean Owen <so...@cloudera.com>

commit 8cd6a96535bd8b8c4e40b87869ba02f916a997ed
Author: Xingbo Jiang <xingbo.jiang@...>
Date:   2018-03-07T21:51:44Z

    [SPARK-23525][SQL] Support ALTER TABLE CHANGE COLUMN COMMENT for external 
hive table
    
    ## What changes were proposed in this pull request?
    
    The following query doesn't work as expected:
    ```
    CREATE EXTERNAL TABLE ext_table(a STRING, b INT, c STRING) PARTITIONED BY 
(d STRING)
    LOCATION 'sql/core/spark-warehouse/ext_table';
    ALTER TABLE ext_table CHANGE a a STRING COMMENT "new comment";
    DESC ext_table;
    ```
    The comment of column `a` is not updated, that's because 
`HiveExternalCatalog.doAlterTable` ignores table schema changes. To fix the 
issue, we should call `doAlterTableDataSchema` instead of `doAlterTable`.
    
    ## How was this patch tested?
    
    Updated `DDLSuite.testChangeColumn`.
    
    Author: Xingbo Jiang <xingbo.ji...@databricks.com>
    
    Closes #20696 from jiangxb1987/alterColumnComment.
    
    (cherry picked from commit ac76eff6a88f6358a321b84cb5e60fb9d6403419)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>

commit ee6e79737cab6078dc94ddc8fd02433e8199104f
Author: Marcelo Vanzin <vanzin@...>
Date:   2018-03-08T01:06:25Z

    [SPARK-23020][CORE][BRANCH-2.3] Fix another race in the in-process launcher 
test.
    
    First the bad news: there's an unfixable race in the launcher code.
    (By unfixable I mean it would take a lot more effort than this change
    to fix it.) The good news is that it should only affect super short
    lived applications, such as the one run by the flaky test, so it's
    possible to work around it in our test.
    
    The fix also uncovered an issue with the recently added "closeAndWait()"
    method; closing the connection would still possibly cause data loss,
    so this change waits a while for the connection to finish itself, and
    closes the socket if that times out. The existing connection timeout
    is reused so that if desired it's possible to control how long to wait.
    
    As part of that I also restored the old behavior that disconnect() would
    force a disconnection from the child app; the "wait for data to arrive"
    approach is only taken when disposing of the handle.
    
    I tested this by inserting a bunch of sleeps in the test and the socket
    handling code in the launcher library; with those I was able to reproduce
    the error from the jenkins jobs. With the changes, even with all the
    sleeps still in place, all tests pass.
    
    Author: Marcelo Vanzin <van...@cloudera.com>
    
    Closes #20743 from vanzin/SPARK-23020.

commit 86ca91551522832141aedc17ba1e47dbeb44d970
Author: jx158167 <jx158167@...>
Date:   2018-03-08T04:08:32Z

    [SPARK-23524] Big local shuffle blocks should not be checked for corruption.
    
    ## What changes were proposed in this pull request?
    
    In current code, all local blocks will be checked for corruption no matter 
it's big or not.  The reasons are as below:
    
    Size in FetchResult for local block is set to be 0 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
    SPARK-4105 meant to only check the small blocks(size<maxBytesInFlight/3), 
but for reason 1, below check will be invalid. 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420
    
    We can fix this and avoid the OOM.
    
    ## How was this patch tested?
    
    UT added
    
    Author: jx158167 <jx158...@antfin.com>
    
    Closes #20685 from jinxing64/SPARK-23524.
    
    (cherry picked from commit 77c91cc746f93e609c412f3a220495d9e931f696)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>

commit 1dd37ff3b8c84c60858b159e745339ce19e53432
Author: Wang Gengliang <gengliang.wang@...>
Date:   2018-03-08T05:53:26Z

    [SPARK-23490][BACKPORT][SQL] Check storage.locationUri with existing table 
in CreateTable
    
    Backport #20660 to branch 2.3
    =====================================
    ## What changes were proposed in this pull request?
    
    For CreateTable with Append mode, we should check if `storage.locationUri` 
is the same with existing table in `PreprocessTableCreation`
    
    In the current code, there is only a simple exception if the 
`storage.locationUri` is different with existing table:
    `org.apache.spark.sql.AnalysisException: Table or view not found:`
    
    which can be improved.
    
    ## How was this patch tested?
    
    Unit test
    
    Author: Wang Gengliang <gengliang.w...@databricks.com>
    
    Closes #20766 from gengliangwang/backport_20660_to_2.3.

commit 404f7e2013ecfdf993a17fd942d8890d9a8100e7
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-03-08T05:58:57Z

    [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3
    
    This is a backport of #20598.
    
    ## What changes were proposed in this pull request?
    
    Solved two bugs to enable stream-stream self joins.
    
    ### Incorrect analysis due to missing MultiInstanceRelation trait
    Streaming leaf nodes did not extend MultiInstanceRelation, which is 
necessary for the catalyst analyzer to convert the self-join logical plan DAG 
into a tree (by creating new instances of the leaf relations). This was causing 
the error `Failure when resolving conflicting references in Join:` (see JIRA 
for details).
    
    ### Incorrect attribute rewrite when splicing batch plans in 
MicroBatchExecution
    When splicing the source's batch plan into the streaming plan (by replacing 
the StreamingExecutionPlan), we were rewriting the attribute reference in the 
streaming plan with the new attribute references from the batch plan. This was 
incorrectly handling the scenario when multiple StreamingExecutionRelation 
point to the same source, and therefore eventually point to the same batch plan 
returned by the source. Here is an example query, and its corresponding plan 
transformations.
    ```
    val df = input.toDF
    val join =
          df.select('value % 5 as "key", 'value).join(
            df.select('value % 5 as "key", 'value), "key")
    ```
    Streaming logical plan before splicing the batch plan
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- StreamingExecutionRelation Memory[#1], value#1
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- StreamingExecutionRelation Memory[#1], value#12  // two different 
leaves pointing to same source
    ```
    Batch logical plan after splicing the batch plan and before rewriting
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- LocalRelation [value#66]           // replaces 
StreamingExecutionRelation Memory[#1], value#1
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- LocalRelation [value#66]           // replaces 
StreamingExecutionRelation Memory[#1], value#12
    ```
    Batch logical plan after rewriting the attributes. Specifically, for 
spliced, the new output attributes (value#66) replace the earlier output 
attributes (value#12, and value#1, one for each StreamingExecutionRelation).
    ```
    Project [key#6, value#66, value#66]       // both value#1 and value#12 
replaces by value#66
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#66 % 5) AS key#6, value#66]
       :  +- LocalRelation [value#66]
       +- Project [(value#66 % 5) AS key#9, value#66]
          +- LocalRelation [value#66]
    ```
    This causes the optimizer to eliminate value#66 from one side of the join.
    ```
    Project [key#6, value#66, value#66]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#66 % 5) AS key#6, value#66]
       :  +- LocalRelation [value#66]
       +- Project [(value#66 % 5) AS key#9]   // this does not generate value, 
incorrect join results
          +- LocalRelation [value#66]
    ```
    
    **Solution**: Instead of rewriting attributes, use a Project to introduce 
aliases between the output attribute references and the new reference generated 
by the spliced plans. The analyzer and optimizer will take care of the rest.
    ```
    Project [key#6, value#1, value#12]
    +- Join Inner, (key#6 = key#9)
       :- Project [(value#1 % 5) AS key#6, value#1]
       :  +- Project [value#66 AS value#1]   // solution: project with aliases
       :     +- LocalRelation [value#66]
       +- Project [(value#12 % 5) AS key#9, value#12]
          +- Project [value#66 AS value#12]    // solution: project with aliases
             +- LocalRelation [value#66]
    ```
    
    ## How was this patch tested?
    New unit test
    
    Author: Tathagata Das <tathagata.das1...@gmail.com>
    
    Closes #20765 from tdas/SPARK-23406-2.3.

commit 8ff8e16e2da4aa5457e4b1e5d575bd1a3a1f0358
Author: Marco Gaido <marcogaido91@...>
Date:   2018-03-09T12:41:23Z

    [SPARK-23436][SQL][BACKPORT-2.3] Infer partition as Date only if it can be 
casted to Date
    
    This PR is to backport https://github.com/apache/spark/pull/20621 to branch 
2.3
    
    ---
    ## What changes were proposed in this pull request?
    
    Before the patch, Spark could infer as Date a partition value which cannot 
be casted to Date (this can happen when there are extra characters after a 
valid date, like `2018-02-15AAA`).
    
    When this happens and the input format has metadata which define the schema 
of the table, then `null` is returned as a value for the partition column, 
because the `cast` operator used in 
(`PartitioningAwareFileIndex.inferPartitioning`) is unable to convert the value.
    
    The PR checks in the partition inference that values can be casted to Date 
and Timestamp, in order to infer that datatype to them.
    
    ## How was this patch tested?
    
    added UT
    
    Author: Marco Gaido <marcogaid...@gmail.com>
    
    Closes #20764 from gatorsmile/backport23436.

commit bc5ce047658272eb48d744e8d069c6cccdf37682
Author: Marcelo Vanzin <vanzin@...>
Date:   2018-03-09T18:36:38Z

    [SPARK-23630][YARN] Allow user's hadoop conf customizations to take effect.
    
    This change restores functionality that was inadvertently removed as part
    of the fix for SPARK-22372.
    
    Also modified an existing unit test to make sure the feature works as 
intended.
    
    Author: Marcelo Vanzin <van...@cloudera.com>
    
    Closes #20776 from vanzin/SPARK-23630.
    
    (cherry picked from commit 2c3673680e16f88f1d1cd73a3f7445ded5b3daa8)
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>

commit 3ec25d5a803888e5e24a47a511e9d88c423c5310
Author: Marco Gaido <marcogaido91@...>
Date:   2018-03-09T21:48:12Z

    [SPARK-23628][SQL][BACKPORT-2.3] calculateParamLength should not return 1 + 
num of expressions
    
    ## What changes were proposed in this pull request?
    
    Backport of ea480990e726aed59750f1cea8d40adba56d991a to branch 2.3.
    
    ## How was this patch tested?
    
    added UT
    
    cc cloud-fan hvanhovell
    
    Author: Marco Gaido <marcogaid...@gmail.com>
    
    Closes #20783 from mgaido91/SPARK-23628_2.3.

commit b083bd107d25bd3f7a4cdcf3aafa07b9895878b6
Author: Michał Świtakowski <michal.switakowski@...>
Date:   2018-03-09T22:29:31Z

    [SPARK-23173][SQL] Avoid creating corrupt parquet files when loading data 
from JSON
    
    ## What changes were proposed in this pull request?
    
    The from_json() function accepts an additional parameter, where the user 
might specify the schema. The issue is that the specified schema might not be 
compatible with data. In particular, the JSON data might be missing data for 
fields declared as non-nullable in the schema. The from_json() function does 
not verify the data against such errors. When data with missing fields is sent 
to the parquet encoder, there is no verification either. The end results is a 
corrupt parquet file.
    
    To avoid corruptions, make sure that all fields in the user-specified 
schema are set to be nullable.
    Since this changes the behavior of a public function, we need to include it 
in release notes.
    The behavior can be reverted by setting 
`spark.sql.fromJsonForceNullableSchema=false`
    
    ## How was this patch tested?
    
    Added two new tests.
    
    Author: Michał Świtakowski <michal.switakow...@databricks.com>
    
    Closes #20694 from mswit-databricks/SPARK-23173.
    
    (cherry picked from commit 2ca9bb083c515511d2bfee271fc3e0269aceb9d5)
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>

commit 5bd306c3896f7967243f7f3be30e4f095b51e0fe
Author: Wang Gengliang <gengliang.wang@...>
Date:   2018-03-09T23:41:19Z

    [SPARK-23624][SQL] Revise doc of method pushFilters in Datasource V2
    
    ## What changes were proposed in this pull request?
    
    Revise doc of method pushFilters in 
SupportsPushDownFilters/SupportsPushDownCatalystFilters
    
    In `FileSourceStrategy`, except `partitionKeyFilters`(the references of 
which is subset of partition keys), all filters needs to be evaluated after 
scanning. Otherwise, Spark will get wrong result from data sources like 
Orc/Parquet.
    
    This PR is to improve the doc.
    
    Author: Wang Gengliang <gengliang.w...@databricks.com>
    
    Closes #20769 from gengliangwang/revise_pushdown_doc.
    
    (cherry picked from commit 10b0657b035641ce735055bba2c8459e71bc2400)
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>

commit 265e61ee96d7dae41711df10b21c571c79f003b7
Author: DylanGuedes <djmgguedes@...>
Date:   2018-03-10T10:48:29Z

    [PYTHON] Changes input variable to not conflict with built-in function
    
    Signed-off-by: DylanGuedes <djmgguedesgmail.com>
    
    ## What changes were proposed in this pull request?
    
    Changes variable name conflict: [input is a built-in python 
function](https://stackoverflow.com/questions/20670732/is-input-a-keyword-in-python).
    
    ## How was this patch tested?
    
    I runned the example and it works fine.
    
    Author: DylanGuedes <djmggue...@gmail.com>
    
    Closes #20775 from DylanGuedes/input_variable.
    
    (cherry picked from commit b6f837c9d3cb0f76f0a52df37e34aea8944f6867)
    Signed-off-by: hyukjinkwon <gurwls...@gmail.com>

commit a8e357ada8a52b9ec239f41de7ac0c0225c76050
Author: Xiayun Sun <xiayunsun@...>
Date:   2018-03-12T13:13:28Z

    [SPARK-23462][SQL] improve missing field error message in `StructType`
    
    ## What changes were proposed in this pull request?
    
    The error message ```s"""Field "$name" does not exist."""``` is thrown when 
looking up an unknown field in StructType. In the error message, we should also 
contain the information about which columns/fields exist in this struct.
    
    ## How was this patch tested?
    
    Added new unit tests.
    
    Note: I created a new `StructTypeSuite.scala` as I couldn't find an 
existing suite that's suitable to place these tests. I may be missing something 
so feel free to propose new locations.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Author: Xiayun Sun <xiayun...@gmail.com>
    
    Closes #20649 from xysun/SPARK-23462.
    
    (cherry picked from commit b304e07e0671faf96530f9d8f49c55a83b07fa15)
    Signed-off-by: hyukjinkwon <gurwls...@gmail.com>

commit 33ba8db8d8c8bf388606a6f5e34b082469038205
Author: Xingbo Jiang <xingbo.jiang@...>
Date:   2018-03-13T04:47:38Z

    [SPARK-23523][SQL][BACKPORT-2.3] Fix the incorrect result caused by the 
rule OptimizeMetadataOnlyQuery
    
    This PR is to backport https://github.com/apache/spark/pull/20684 and 
https://github.com/apache/spark/pull/20693 to Spark 2.3 branch
    
    ---
    
    ## What changes were proposed in this pull request?
    ```Scala
    val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
     Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
     .write.json(tablePath.getCanonicalPath)
     val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", 
"CoL3").distinct()
     df.show()
    ```
    
    It generates a wrong result.
    ```
    [c,e,a]
    ```
    
    We have a bug in the rule `OptimizeMetadataOnlyQuery `. We should respect 
the attribute order in the original leaf node. This PR is to fix it.
    
    ## How was this patch tested?
    Added a test case
    
    Author: Xingbo Jiang <xingbo.ji...@databricks.com>
    Author: gatorsmile <gatorsm...@gmail.com>
    
    Closes #20763 from gatorsmile/backport23523.

commit f3efbfa4b973cdb8cf992e30540609d0006e0cfe
Author: Kazuaki Ishizaki <ishizaki@...>
Date:   2018-03-13T22:04:16Z

    [SPARK-23598][SQL] Make methods in BufferedRowIterator public to avoid 
runtime error for a large query
    
    ## What changes were proposed in this pull request?
    
    This PR fixes runtime error regarding a large query when a generated code 
has split classes. The issue is `append()`, `stopEarly()`, and other methods 
are not accessible from split classes that are not subclasses of 
`BufferedRowIterator`.
    This PR fixes this issue by making them `public`.
    
    Before applying the PR, we see the following exception by running the 
attached program with `CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD=-1`.
    ```
      test("SPARK-23598") {
        // When set -1 to CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD, an 
exception is thrown
        val df_pet_age = Seq((8, "bat"), (15, "mouse"), (5, 
"horse")).toDF("age", "name")
        df_pet_age.groupBy("name").avg("age").show()
      }
    ```
    
    Exception:
    ```
    19:40:52.591 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
    19:41:32.319 ERROR org.apache.spark.executor.Executor: Exception in task 
0.0 in stage 0.0 (TID 0)
    java.lang.IllegalAccessError: tried to access method 
org.apache.spark.sql.execution.BufferedRowIterator.shouldStop()Z from class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(generated.java:203)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:160)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    ...
    ```
    
    Generated code (line 195 calles `stopEarly()`).
    ```
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ // codegenStageId=1
    /* 006 */ final class GeneratedIteratorForCodegenStage1 extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */   private Object[] references;
    /* 008 */   private scala.collection.Iterator[] inputs;
    /* 009 */   private boolean agg_initAgg;
    /* 010 */   private boolean agg_bufIsNull;
    /* 011 */   private double agg_bufValue;
    /* 012 */   private boolean agg_bufIsNull1;
    /* 013 */   private long agg_bufValue1;
    /* 014 */   private agg_FastHashMap agg_fastHashMap;
    /* 015 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow, 
UnsafeRow> agg_fastHashMapIter;
    /* 016 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
    /* 017 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
    /* 018 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
    /* 019 */   private scala.collection.Iterator inputadapter_input;
    /* 020 */   private boolean agg_agg_isNull11;
    /* 021 */   private boolean agg_agg_isNull25;
    /* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
agg_mutableStateArray1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2];
    /* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
agg_mutableStateArray2 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
    /* 024 */   private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2];
    /* 025 */
    /* 026 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
    /* 027 */     this.references = references;
    /* 028 */   }
    /* 029 */
    /* 030 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 031 */     partitionIndex = index;
    /* 032 */     this.inputs = inputs;
    /* 033 */
    /* 034 */     agg_fastHashMap = new 
agg_FastHashMap(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) 
references[0] /* plan */).getTaskMemoryManager(), 
((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* 
plan */).getEmptyAggregationBuffer());
    /* 035 */     agg_hashMap = 
((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* 
plan */).createHashMap();
    /* 036 */     inputadapter_input = inputs[0];
    /* 037 */     agg_mutableStateArray[0] = new UnsafeRow(1);
    /* 038 */     agg_mutableStateArray1[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[0],
 32);
    /* 039 */     agg_mutableStateArray2[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[0],
 1);
    /* 040 */     agg_mutableStateArray[1] = new UnsafeRow(3);
    /* 041 */     agg_mutableStateArray1[1] = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[1],
 32);
    /* 042 */     agg_mutableStateArray2[1] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[1],
 3);
    /* 043 */
    /* 044 */   }
    /* 045 */
    /* 046 */   public class agg_FastHashMap {
    /* 047 */     private 
org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
    /* 048 */     private int[] buckets;
    /* 049 */     private int capacity = 1 << 16;
    /* 050 */     private double loadFactor = 0.5;
    /* 051 */     private int numBuckets = (int) (capacity / loadFactor);
    /* 052 */     private int maxSteps = 2;
    /* 053 */     private int numRows = 0;
    /* 054 */     private org.apache.spark.sql.types.StructType keySchema = new 
org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1] 
/* keyName */), org.apache.spark.sql.types.DataTypes.StringType);
    /* 055 */     private org.apache.spark.sql.types.StructType valueSchema = 
new org.apache.spark.sql.types.StructType().add(((java.lang.String) 
references[2] /* keyName */), org.apache.spark.sql.types.DataTypes.DoubleType)
    /* 056 */     .add(((java.lang.String) references[3] /* keyName */), 
org.apache.spark.sql.types.DataTypes.LongType);
    /* 057 */     private Object emptyVBase;
    /* 058 */     private long emptyVOff;
    /* 059 */     private int emptyVLen;
    /* 060 */     private boolean isBatchFull = false;
    /* 061 */
    /* 062 */     public agg_FastHashMap(
    /* 063 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
    /* 064 */       InternalRow emptyAggregationBuffer) {
    /* 065 */       batch = 
org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
    /* 066 */       .allocate(keySchema, valueSchema, taskMemoryManager, 
capacity);
    /* 067 */
    /* 068 */       final UnsafeProjection valueProjection = 
UnsafeProjection.create(valueSchema);
    /* 069 */       final byte[] emptyBuffer = 
valueProjection.apply(emptyAggregationBuffer).getBytes();
    /* 070 */
    /* 071 */       emptyVBase = emptyBuffer;
    /* 072 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
    /* 073 */       emptyVLen = emptyBuffer.length;
    /* 074 */
    /* 075 */       buckets = new int[numBuckets];
    /* 076 */       java.util.Arrays.fill(buckets, -1);
    /* 077 */     }
    /* 078 */
    /* 079 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow 
findOrInsert(UTF8String agg_key) {
    /* 080 */       long h = hash(agg_key);
    /* 081 */       int step = 0;
    /* 082 */       int idx = (int) h & (numBuckets - 1);
    /* 083 */       while (step < maxSteps) {
    /* 084 */         // Return bucket index if it's either an empty slot or 
already contains the key
    /* 085 */         if (buckets[idx] == -1) {
    /* 086 */           if (numRows < capacity && !isBatchFull) {
    /* 087 */             // creating the unsafe for new entry
    /* 088 */             UnsafeRow agg_result = new UnsafeRow(1);
    /* 089 */             
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder
    /* 090 */             = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result,
    /* 091 */               32);
    /* 092 */             
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter
    /* 093 */             = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
    /* 094 */               agg_holder,
    /* 095 */               1);
    /* 096 */             agg_holder.reset(); //TODO: investigate if reset or 
zeroout are actually needed
    /* 097 */             agg_rowWriter.zeroOutNullBytes();
    /* 098 */             agg_rowWriter.write(0, agg_key);
    /* 099 */             agg_result.setTotalSize(agg_holder.totalSize());
    /* 100 */             Object kbase = agg_result.getBaseObject();
    /* 101 */             long koff = agg_result.getBaseOffset();
    /* 102 */             int klen = agg_result.getSizeInBytes();
    /* 103 */
    /* 104 */             UnsafeRow vRow
    /* 105 */             = batch.appendRow(kbase, koff, klen, emptyVBase, 
emptyVOff, emptyVLen);
    /* 106 */             if (vRow == null) {
    /* 107 */               isBatchFull = true;
    /* 108 */             } else {
    /* 109 */               buckets[idx] = numRows++;
    /* 110 */             }
    /* 111 */             return vRow;
    /* 112 */           } else {
    /* 113 */             // No more space
    /* 114 */             return null;
    /* 115 */           }
    /* 116 */         } else if (equals(idx, agg_key)) {
    /* 117 */           return batch.getValueRow(buckets[idx]);
    /* 118 */         }
    /* 119 */         idx = (idx + 1) & (numBuckets - 1);
    /* 120 */         step++;
    /* 121 */       }
    /* 122 */       // Didn't find it
    /* 123 */       return null;
    /* 124 */     }
    /* 125 */
    /* 126 */     private boolean equals(int idx, UTF8String agg_key) {
    /* 127 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
    /* 128 */       return (row.getUTF8String(0).equals(agg_key));
    /* 129 */     }
    /* 130 */
    /* 131 */     private long hash(UTF8String agg_key) {
    /* 132 */       long agg_hash = 0;
    /* 133 */
    /* 134 */       int agg_result = 0;
    /* 135 */       byte[] agg_bytes = agg_key.getBytes();
    /* 136 */       for (int i = 0; i < agg_bytes.length; i++) {
    /* 137 */         int agg_hash1 = agg_bytes[i];
    /* 138 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + 
(agg_result << 6) + (agg_result >>> 2);
    /* 139 */       }
    /* 140 */
    /* 141 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + 
(agg_hash << 6) + (agg_hash >>> 2);
    /* 142 */
    /* 143 */       return agg_hash;
    /* 144 */     }
    /* 145 */
    /* 146 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, 
UnsafeRow> rowIterator() {
    /* 147 */       return batch.rowIterator();
    /* 148 */     }
    /* 149 */
    /* 150 */     public void close() {
    /* 151 */       batch.close();
    /* 152 */     }
    /* 153 */
    /* 154 */   }
    /* 155 */
    /* 156 */   protected void processNext() throws java.io.IOException {
    /* 157 */     if (!agg_initAgg) {
    /* 158 */       agg_initAgg = true;
    /* 159 */       long wholestagecodegen_beforeAgg = System.nanoTime();
    /* 160 */       agg_nestedClassInstance1.agg_doAggregateWithKeys();
    /* 161 */       ((org.apache.spark.sql.execution.metric.SQLMetric) 
references[8] /* aggTime */).add((System.nanoTime() - 
wholestagecodegen_beforeAgg) / 1000000);
    /* 162 */     }
    /* 163 */
    /* 164 */     // output the result
    /* 165 */
    /* 166 */     while (agg_fastHashMapIter.next()) {
    /* 167 */       UnsafeRow agg_aggKey = (UnsafeRow) 
agg_fastHashMapIter.getKey();
    /* 168 */       UnsafeRow agg_aggBuffer = (UnsafeRow) 
agg_fastHashMapIter.getValue();
    /* 169 */       
wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey, 
agg_aggBuffer);
    /* 170 */
    /* 171 */       if (shouldStop()) return;
    /* 172 */     }
    /* 173 */     agg_fastHashMap.close();
    /* 174 */
    /* 175 */     while (agg_mapIter.next()) {
    /* 176 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
    /* 177 */       UnsafeRow agg_aggBuffer = (UnsafeRow) 
agg_mapIter.getValue();
    /* 178 */       
wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey, 
agg_aggBuffer);
    /* 179 */
    /* 180 */       if (shouldStop()) return;
    /* 181 */     }
    /* 182 */
    /* 183 */     agg_mapIter.close();
    /* 184 */     if (agg_sorter == null) {
    /* 185 */       agg_hashMap.free();
    /* 186 */     }
    /* 187 */   }
    /* 188 */
    /* 189 */   private wholestagecodegen_NestedClass 
wholestagecodegen_nestedClassInstance = new wholestagecodegen_NestedClass();
    /* 190 */   private agg_NestedClass1 agg_nestedClassInstance1 = new 
agg_NestedClass1();
    /* 191 */   private agg_NestedClass agg_nestedClassInstance = new 
agg_NestedClass();
    /* 192 */
    /* 193 */   private class agg_NestedClass1 {
    /* 194 */     private void agg_doAggregateWithKeys() throws 
java.io.IOException {
    /* 195 */       while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 196 */         InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 197 */         int inputadapter_value = inputadapter_row.getInt(0);
    /* 198 */         boolean inputadapter_isNull1 = 
inputadapter_row.isNullAt(1);
    /* 199 */         UTF8String inputadapter_value1 = inputadapter_isNull1 ?
    /* 200 */         null : (inputadapter_row.getUTF8String(1));
    /* 201 */
    /* 202 */         agg_nestedClassInstance.agg_doConsume(inputadapter_row, 
inputadapter_value, inputadapter_value1, inputadapter_isNull1);
    /* 203 */         if (shouldStop()) return;
    /* 204 */       }
    /* 205 */
    /* 206 */       agg_fastHashMapIter = agg_fastHashMap.rowIterator();
    /* 207 */       agg_mapIter = 
((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* 
plan */).finishAggregate(agg_hashMap, agg_sorter, 
((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* peakMemory 
*/), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* 
spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] 
/* avgHashProbe */));
    /* 208 */
    /* 209 */     }
    /* 210 */
    /* 211 */   }
    /* 212 */
    /* 213 */   private class wholestagecodegen_NestedClass {
    /* 214 */     private void agg_doAggregateWithKeysOutput(UnsafeRow 
agg_keyTerm, UnsafeRow agg_bufferTerm)
    /* 215 */     throws java.io.IOException {
    /* 216 */       ((org.apache.spark.sql.execution.metric.SQLMetric) 
references[7] /* numOutputRows */).add(1);
    /* 217 */
    /* 218 */       boolean agg_isNull35 = agg_keyTerm.isNullAt(0);
    /* 219 */       UTF8String agg_value37 = agg_isNull35 ?
    /* 220 */       null : (agg_keyTerm.getUTF8String(0));
    /* 221 */       boolean agg_isNull36 = agg_bufferTerm.isNullAt(0);
    /* 222 */       double agg_value38 = agg_isNull36 ?
    /* 223 */       -1.0 : (agg_bufferTerm.getDouble(0));
    /* 224 */       boolean agg_isNull37 = agg_bufferTerm.isNullAt(1);
    /* 225 */       long agg_value39 = agg_isNull37 ?
    /* 226 */       -1L : (agg_bufferTerm.getLong(1));
    /* 227 */
    /* 228 */       agg_mutableStateArray1[1].reset();
    /* 229 */
    /* 230 */       agg_mutableStateArray2[1].zeroOutNullBytes();
    /* 231 */
    /* 232 */       if (agg_isNull35) {
    /* 233 */         agg_mutableStateArray2[1].setNullAt(0);
    /* 234 */       } else {
    /* 235 */         agg_mutableStateArray2[1].write(0, agg_value37);
    /* 236 */       }
    /* 237 */
    /* 238 */       if (agg_isNull36) {
    /* 239 */         agg_mutableStateArray2[1].setNullAt(1);
    /* 240 */       } else {
    /* 241 */         agg_mutableStateArray2[1].write(1, agg_value38);
    /* 242 */       }
    /* 243 */
    /* 244 */       if (agg_isNull37) {
    /* 245 */         agg_mutableStateArray2[1].setNullAt(2);
    /* 246 */       } else {
    /* 247 */         agg_mutableStateArray2[1].write(2, agg_value39);
    /* 248 */       }
    /* 249 */       
agg_mutableStateArray[1].setTotalSize(agg_mutableStateArray1[1].totalSize());
    /* 250 */       append(agg_mutableStateArray[1]);
    /* 251 */
    /* 252 */     }
    /* 253 */
    /* 254 */   }
    /* 255 */
    /* 256 */   private class agg_NestedClass {
    /* 257 */     private void agg_doConsume(InternalRow inputadapter_row, int 
agg_expr_0, UTF8String agg_expr_1, boolean agg_exprIsNull_1) throws 
java.io.IOException {
    /* 258 */       UnsafeRow agg_unsafeRowAggBuffer = null;
    /* 259 */       UnsafeRow agg_fastAggBuffer = null;
    /* 260 */
    /* 261 */       if (true) {
    /* 262 */         if (!agg_exprIsNull_1) {
    /* 263 */           agg_fastAggBuffer = agg_fastHashMap.findOrInsert(
    /* 264 */             agg_expr_1);
    /* 265 */         }
    /* 266 */       }
    /* 267 */       // Cannot find the key in fast hash map, try regular hash 
map.
    /* 268 */       if (agg_fastAggBuffer == null) {
    /* 269 */         // generate grouping key
    /* 270 */         agg_mutableStateArray1[0].reset();
    /* 271 */
    /* 272 */         agg_mutableStateArray2[0].zeroOutNullBytes();
    /* 273 */
    /* 274 */         if (agg_exprIsNull_1) {
    /* 275 */           agg_mutableStateArray2[0].setNullAt(0);
    /* 276 */         } else {
    /* 277 */           agg_mutableStateArray2[0].write(0, agg_expr_1);
    /* 278 */         }
    /* 279 */         
agg_mutableStateArray[0].setTotalSize(agg_mutableStateArray1[0].totalSize());
    /* 280 */         int agg_value7 = 42;
    /* 281 */
    /* 282 */         if (!agg_exprIsNull_1) {
    /* 283 */           agg_value7 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_1.getBaseObject(),
 agg_expr_1.getBaseOffset(), agg_expr_1.numBytes(), agg_value7);
    /* 284 */         }
    /* 285 */         if (true) {
    /* 286 */           // try to get the buffer from hash map
    /* 287 */           agg_unsafeRowAggBuffer =
    /* 288 */           
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_mutableStateArray[0], 
agg_value7);
    /* 289 */         }
    /* 290 */         // Can't allocate buffer from the hash map. Spill the map 
and fallback to sort-based
    /* 291 */         // aggregation after processing all input rows.
    /* 292 */         if (agg_unsafeRowAggBuffer == null) {
    /* 293 */           if (agg_sorter == null) {
    /* 294 */             agg_sorter = 
agg_hashMap.destructAndCreateExternalSorter();
    /* 295 */           } else {
    /* 296 */             
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
    /* 297 */           }
    /* 298 */
    /* 299 */           // the hash map had be spilled, it should have enough 
memory now,
    /* 300 */           // try to allocate buffer again.
    /* 301 */           agg_unsafeRowAggBuffer = 
agg_hashMap.getAggregationBufferFromUnsafeRow(
    /* 302 */             agg_mutableStateArray[0], agg_value7);
    /* 303 */           if (agg_unsafeRowAggBuffer == null) {
    /* 304 */             // failed to allocate the first page
    /* 305 */             throw new OutOfMemoryError("No enough memory for 
aggregation");
    /* 306 */           }
    /* 307 */         }
    /* 308 */
    /* 309 */       }
    /* 310 */
    /* 311 */       if (agg_fastAggBuffer != null) {
    /* 312 */         // common sub-expressions
    /* 313 */         boolean agg_isNull21 = false;
    /* 314 */         long agg_value23 = -1L;
    /* 315 */         if (!false) {
    /* 316 */           agg_value23 = (long) agg_expr_0;
    /* 317 */         }
    /* 318 */         // evaluate aggregate function
    /* 319 */         boolean agg_isNull23 = true;
    /* 320 */         double agg_value25 = -1.0;
    /* 321 */
    /* 322 */         boolean agg_isNull24 = agg_fastAggBuffer.isNullAt(0);
    /* 323 */         double agg_value26 = agg_isNull24 ?
    /* 324 */         -1.0 : (agg_fastAggBuffer.getDouble(0));
    /* 325 */         if (!agg_isNull24) {
    /* 326 */           agg_agg_isNull25 = true;
    /* 327 */           double agg_value27 = -1.0;
    /* 328 */           do {
    /* 329 */             boolean agg_isNull26 = agg_isNull21;
    /* 330 */             double agg_value28 = -1.0;
    /* 331 */             if (!agg_isNull21) {
    /* 332 */               agg_value28 = (double) agg_value23;
    /* 333 */             }
    /* 334 */             if (!agg_isNull26) {
    /* 335 */               agg_agg_isNull25 = false;
    /* 336 */               agg_value27 = agg_value28;
    /* 337 */               continue;
    /* 338 */             }
    /* 339 */
    /* 340 */             boolean agg_isNull27 = false;
    /* 341 */             double agg_value29 = -1.0;
    /* 342 */             if (!false) {
    /* 343 */               agg_value29 = (double) 0;
    /* 344 */             }
    /* 345 */             if (!agg_isNull27) {
    /* 346 */               agg_agg_isNull25 = false;
    /* 347 */               agg_value27 = agg_value29;
    /* 348 */               continue;
    /* 349 */             }
    /* 350 */
    /* 351 */           } while (false);
    /* 352 */
    /* 353 */           agg_isNull23 = false; // resultCode could change 
nullability.
    /* 354 */           agg_value25 = agg_value26 + agg_value27;
    /* 355 */
    /* 356 */         }
    /* 357 */         boolean agg_isNull29 = false;
    /* 358 */         long agg_value31 = -1L;
    /* 359 */         if (!false && agg_isNull21) {
    /* 360 */           boolean agg_isNull31 = agg_fastAggBuffer.isNullAt(1);
    /* 361 */           long agg_value33 = agg_isNull31 ?
    /* 362 */           -1L : (agg_fastAggBuffer.getLong(1));
    /* 363 */           agg_isNull29 = agg_isNull31;
    /* 364 */           agg_value31 = agg_value33;
    /* 365 */         } else {
    /* 366 */           boolean agg_isNull32 = true;
    /* 367 */           long agg_value34 = -1L;
    /* 368 */
    /* 369 */           boolean agg_isNull33 = agg_fastAggBuffer.isNullAt(1);
    /* 370 */           long agg_value35 = agg_isNull33 ?
    /* 371 */           -1L : (agg_fastAggBuffer.getLong(1));
    /* 372 */           if (!agg_isNull33) {
    /* 373 */             agg_isNull32 = false; // resultCode could change 
nullability.
    /* 374 */             agg_value34 = agg_value35 + 1L;
    /* 375 */
    /* 376 */           }
    /* 377 */           agg_isNull29 = agg_isNull32;
    /* 378 */           agg_value31 = agg_value34;
    /* 379 */         }
    /* 380 */         // update fast row
    /* 381 */         if (!agg_isNull23) {
    /* 382 */           agg_fastAggBuffer.setDouble(0, agg_value25);
    /* 383 */         } else {
    /* 384 */           agg_fastAggBuffer.setNullAt(0);
    /* 385 */         }
    /* 386 */
    /* 387 */         if (!agg_isNull29) {
    /* 388 */           agg_fastAggBuffer.setLong(1, agg_value31);
    /* 389 */         } else {
    /* 390 */           agg_fastAggBuffer.setNullAt(1);
    /* 391 */         }
    /* 392 */       } else {
    /* 393 */         // common sub-expressions
    /* 394 */         boolean agg_isNull7 = false;
    /* 395 */         long agg_value9 = -1L;
    /* 396 */         if (!false) {
    /* 397 */           agg_value9 = (long) agg_expr_0;
    /* 398 */         }
    /* 399 */         // evaluate aggregate function
    /* 400 */         boolean agg_isNull9 = true;
    /* 401 */         double agg_value11 = -1.0;
    /* 402 */
    /* 403 */         boolean agg_isNull10 = agg_unsafeRowAggBuffer.isNullAt(0);
    /* 404 */         double agg_value12 = agg_isNull10 ?
    /* 405 */         -1.0 : (agg_unsafeRowAggBuffer.getDouble(0));
    /* 406 */         if (!agg_isNull10) {
    /* 407 */           agg_agg_isNull11 = true;
    /* 408 */           double agg_value13 = -1.0;
    /* 409 */           do {
    /* 410 */             boolean agg_isNull12 = agg_isNull7;
    /* 411 */             double agg_value14 = -1.0;
    /* 412 */             if (!agg_isNull7) {
    /* 413 */               agg_value14 = (double) agg_value9;
    /* 414 */             }
    /* 415 */             if (!agg_isNull12) {
    /* 416 */               agg_agg_isNull11 = false;
    /* 417 */               agg_value13 = agg_value14;
    /* 418 */               continue;
    /* 419 */             }
    /* 420 */
    /* 421 */             boolean agg_isNull13 = false;
    /* 422 */             double agg_value15 = -1.0;
    /* 423 */             if (!false) {
    /* 424 */               agg_value15 = (double) 0;
    /* 425 */             }
    /* 426 */             if (!agg_isNull13) {
    /* 427 */               agg_agg_isNull11 = false;
    /* 428 */               agg_value13 = agg_value15;
    /* 429 */               continue;
    /* 430 */             }
    /* 431 */
    /* 432 */           } while (false);
    /* 433 */
    /* 434 */           agg_isNull9 = false; // resultCode could change 
nullability.
    /* 435 */           agg_value11 = agg_value12 + agg_value13;
    /* 436 */
    /* 437 */         }
    /* 438 */         boolean agg_isNull15 = false;
    /* 439 */         long agg_value17 = -1L;
    /* 440 */         if (!false && agg_isNull7) {
    /* 441 */           boolean agg_isNull17 = 
agg_unsafeRowAggBuffer.isNullAt(1);
    /* 442 */           long agg_value19 = agg_isNull17 ?
    /* 443 */           -1L : (agg_unsafeRowAggBuffer.getLong(1));
    /* 444 */           agg_isNull15 = agg_isNull17;
    /* 445 */           agg_value17 = agg_value19;
    /* 446 */         } else {
    /* 447 */           boolean agg_isNull18 = true;
    /* 448 */           long agg_value20 = -1L;
    /* 449 */
    /* 450 */           boolean agg_isNull19 = 
agg_unsafeRowAggBuffer.isNullAt(1);
    /* 451 */           long agg_value21 = agg_isNull19 ?
    /* 452 */           -1L : (agg_unsafeRowAggBuffer.getLong(1));
    /* 453 */           if (!agg_isNull19) {
    /* 454 */             agg_isNull18 = false; // resultCode could change 
nullability.
    /* 455 */             agg_value20 = agg_value21 + 1L;
    /* 456 */
    /* 457 */           }
    /* 458 */           agg_isNull15 = agg_isNull18;
    /* 459 */           agg_value17 = agg_value20;
    /* 460 */         }
    /* 461 */         // update unsafe row buffer
    /* 462 */         if (!agg_isNull9) {
    /* 463 */           agg_unsafeRowAggBuffer.setDouble(0, agg_value11);
    /* 464 */         } else {
    /* 465 */           agg_unsafeRowAggBuffer.setNullAt(0);
    /* 466 */         }
    /* 467 */
    /* 468 */         if (!agg_isNull15) {
    /* 469 */           agg_unsafeRowAggBuffer.setLong(1, agg_value17);
    /* 470 */         } else {
    /* 471 */           agg_unsafeRowAggBuffer.setNullAt(1);
    /* 472 */         }
    /* 473 */
    /* 474 */       }
    /* 475 */
    /* 476 */     }
    /* 477 */
    /* 478 */   }
    /* 479 */
    /* 480 */ }
    ```
    
    ## How was this patch tested?
    
    Added UT into `WholeStageCodegenSuite`
    
    Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
    
    Closes #20779 from kiszk/SPARK-23598.
    
    (cherry picked from commit 1098933b0ac5cdb18101d3aebefa773c2ce05a50)
    Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>

commit 0663b61193b37094b9d00c7f2cbb0268ad946e25
Author: “attilapiros” <piros.attila.zsolt@...>
Date:   2018-03-15T01:36:31Z

    [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z
    
    # What changes were proposed in this pull request?
    
    Adds structured streaming tests using testTransformer for these suites:
    
    - NGramSuite
    - NormalizerSuite
    - OneHotEncoderEstimatorSuite
    - OneHotEncoderSuite
    - PCASuite
    - PolynomialExpansionSuite
    - QuantileDiscretizerSuite
    - RFormulaSuite
    - SQLTransformerSuite
    - StandardScalerSuite
    - StopWordsRemoverSuite
    - StringIndexerSuite
    - TokenizerSuite
    - RegexTokenizerSuite
    - VectorAssemblerSuite
    - VectorIndexerSuite
    - VectorSizeHintSuite
    - VectorSlicerSuite
    - Word2VecSuite
    
    # How was this patch tested?
    
    They are unit test.
    
    Author: “attilapiros” <piros.attila.zs...@gmail.com>
    
    Closes #20686 from attilapiros/SPARK-22915.
    
    (cherry picked from commit 279b3db8970809104c30941254e57e3d62da5041)
    Signed-off-by: Joseph K. Bradley <jos...@databricks.com>

commit a9d0784e6733666a0608e8236322f1dc380e96b7
Author: smallory <s.mallory@...>
Date:   2018-03-15T02:58:54Z

    [SPARK-23642][DOCS] AccumulatorV2 subclass isZero scaladoc fix
    
    Added/corrected scaladoc for isZero on the DoubleAccumulator, 
CollectionAccumulator, and LongAccumulator subclasses of AccumulatorV2, 
particularly noting where there are requirements in addition to having a value 
of zero in order to return true.
    
    ## What changes were proposed in this pull request?
    
    Three scaladoc comments are updated in AccumulatorV2.scala
    No changes outside of comment blocks were made.
    
    ## How was this patch tested?
    
    Running "sbt unidoc", fixing style errors found, and reviewing the 
resulting local scaladoc in firefox.
    
    Author: smallory <s.mall...@gmail.com>
    
    Closes #20790 from smallory/patch-1.
    
    (cherry picked from commit 4f5bad615b47d743b8932aea1071652293981604)
    Signed-off-by: hyukjinkwon <gurwls...@gmail.com>

commit 72c13ed844d6be6510ce2c5e3526c234d1d5e10f
Author: hyukjinkwon <gurwls223@...>
Date:   2018-03-15T17:55:33Z

    [SPARK-23695][PYTHON] Fix the error message for Kinesis streaming tests
    
    ## What changes were proposed in this pull request?
    
    This PR proposes to fix the error message for Kinesis in PySpark when its 
jar is missing but explicitly enabled.
    
    ```bash
    ENABLE_KINESIS_TESTS=1 SPARK_TESTING=1 bin/pyspark pyspark.streaming.tests
    ```
    
    Before:
    
    ```
    Skipped test_flume_stream (enable by setting environment variable 
ENABLE_FLUME_TESTS=1Skipped test_kafka_stream (enable by setting environment 
variable ENABLE_KAFKA_0_8_TESTS=1Traceback (most recent call last):
      File 
"/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py",
 line 174, in _run_module_as_main
        "__main__", fname, loader, pkg_name)
      File 
"/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py",
 line 72, in _run_code
        exec code in run_globals
      File "/.../spark/python/pyspark/streaming/tests.py", line 1572, in 
<module>
        % kinesis_asl_assembly_dir) +
    NameError: name 'kinesis_asl_assembly_dir' is not defined
    ```
    
    After:
    
    ```
    Skipped test_flume_stream (enable by setting environment variable 
ENABLE_FLUME_TESTS=1Skipped test_kafka_stream (enable by setting environment 
variable ENABLE_KAFKA_0_8_TESTS=1Traceback (most recent call last):
      File 
"/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py",
 line 174, in _run_module_as_main
        "__main__", fname, loader, pkg_name)
      File 
"/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py",
 line 72, in _run_code
        exec code in run_globals
      File "/.../spark/python/pyspark/streaming/tests.py", line 1576, in 
<module>
        "You need to build Spark with 'build/sbt -Pkinesis-asl "
    Exception: Failed to find Spark Streaming Kinesis assembly jar in 
/.../spark/external/kinesis-asl-assembly. You need to build Spark with 
'build/sbt -Pkinesis-asl assembly/package 
streaming-kinesis-asl-assembly/assembly'or 'build/mvn -Pkinesis-asl package' 
before running this test.
    ```
    
    ## How was this patch tested?
    
    Manually tested.
    
    Author: hyukjinkwon <gurwls...@gmail.com>
    
    Closes #20834 from HyukjinKwon/minor-variable.
    
    (cherry picked from commit 56e8f48a43eb51e8582db2461a585b13a771a00a)
    Signed-off-by: Takuya UESHIN <ues...@databricks.com>

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to