GitHub user engineeyao opened a pull request:

    https://github.com/apache/spark/pull/19187

    Branch 2.1

    ## What changes were proposed in this pull request?
    
    (Please fill in changes proposed in this fix)
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/spark branch-2.1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19187.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19187
    
----
commit 62fab5beee147c90d8b7f8092b4ee76ba611ee8e
Author: uncleGen <husty...@gmail.com>
Date:   2017-02-07T05:03:20Z

    [SPARK-19407][SS] defaultFS is used FileSystem.get instead of getting it 
from uri scheme
    
    ## What changes were proposed in this pull request?
    
    ```
    Caused by: java.lang.IllegalArgumentException: Wrong FS: 
s3a://**************/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata, 
expected: file:///
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
        at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
        at 
org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
        at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
        at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
        at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
    ```
    
    Can easily replicate on spark standalone cluster by providing checkpoint 
location uri scheme anything other than "file://" and not overriding in config.
    
    WorkAround  --conf spark.hadoop.fs.defaultFS=s3a://somebucket or set it in 
sparkConf or spark-default.conf
    
    ## How was this patch tested?
    
    existing ut
    
    Author: uncleGen <husty...@gmail.com>
    
    Closes #16815 from uncleGen/SPARK-19407.
    
    (cherry picked from commit 7a0a630e0f699017c7d0214923cd4aa0227e62ff)
    Signed-off-by: Shixiong Zhu <shixi...@databricks.com>

commit dd1abef138581f30ab7a8dfacb616fe7dd64b421
Author: Aseem Bansal <anshban...@users.noreply.github.com>
Date:   2017-02-07T11:44:14Z

    [SPARK-19444][ML][DOCUMENTATION] Fix imports not being present in 
documentation
    
    ## What changes were proposed in this pull request?
    
    SPARK-19444 imports not being present in documentation
    
    ## How was this patch tested?
    
    Manual
    
    ## Disclaimer
    
    Contribution is original work and I license the work to the project under 
the project’s open source license
    
    Author: Aseem Bansal <anshban...@users.noreply.github.com>
    
    Closes #16789 from anshbansal/patch-1.
    
    (cherry picked from commit aee2bd2c7ee97a58f0adec82ec52e5625b39e804)
    Signed-off-by: Sean Owen <so...@cloudera.com>

commit e642a07d57798f98b25ba08ed7ae3abe0f597941
Author: Tyson Condie <tcon...@gmail.com>
Date:   2017-02-07T22:31:23Z

    [SPARK-18682][SS] Batch Source for Kafka
    
    Today, you can start a stream that reads from kafka. However, given kafka's 
configurable retention period, it seems like sometimes you might just want to 
read all of the data that is available now. As such we should add a version 
that works with spark.read as well.
    The options should be the same as the streaming kafka source, with the 
following differences:
    startingOffsets should default to earliest, and should not allow latest 
(which would always be empty).
    endingOffsets should also be allowed and should default to latest. the same 
assign json format as startingOffsets should also be accepted.
    It would be really good, if things like .limit(n) were enough to prevent 
all the data from being read (this might just work).
    
    KafkaRelationSuite was added for testing batch queries via KafkaUtils.
    
    Author: Tyson Condie <tcon...@gmail.com>
    
    Closes #16686 from tcondie/SPARK-18682.
    
    (cherry picked from commit 8df444403489aec0d68f7d930afdc4f7d50e0b41)
    Signed-off-by: Shixiong Zhu <shixi...@databricks.com>

commit 706d6c154d2471c00253bf9b0c4e867752f841fe
Author: CodingCat <zhunans...@gmail.com>
Date:   2017-02-08T04:25:18Z

    [SPARK-19499][SS] Add more notes in the comments of Sink.addBatch()
    
    ## What changes were proposed in this pull request?
    
    addBatch method in Sink trait is supposed to be a synchronous method to 
coordinate with the fault-tolerance design in StreamingExecution (being 
different with the compute() method in DStream)
    
    We need to add more notes in the comments of this method to remind the 
developers
    
    ## How was this patch tested?
    
    existing tests
    
    Author: CodingCat <zhunans...@gmail.com>
    
    Closes #16840 from CodingCat/SPARK-19499.
    
    (cherry picked from commit d4cd975718716be11a42ce92a47c45be1a46bd60)
    Signed-off-by: Shixiong Zhu <shixi...@databricks.com>

commit 4d040297f55243703463ea71d5302bb46ea0bf3f
Author: manugarri <manuel.garrido.p...@gmail.com>
Date:   2017-02-08T05:45:33Z

    [MINOR][DOC] Remove parenthesis in readStream() on kafka structured 
streaming doc
    
    There is a typo in 
http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-stream
 , python example n1 uses `readStream()` instead of `readStream`
    
    Just removed the parenthesis.
    
    Author: manugarri <manuel.garrido.p...@gmail.com>
    
    Closes #16836 from manugarri/fix_kafka_python_doc.
    
    (cherry picked from commit 5a0569ce693c635c5fa12b2de33ed3643ce888e3)
    Signed-off-by: Shixiong Zhu <shixi...@databricks.com>

commit 71b6eacf72fb50862d33a2bf6a0662d6c4e73bbd
Author: Herman van Hovell <hvanhov...@databricks.com>
Date:   2017-02-08T07:35:15Z

    [SPARK-18609][SPARK-18841][SQL][BACKPORT-2.1] Fix redundant Alias removal 
in the optimizer
    
    This is a backport of 
https://github.com/apache/spark/commit/73ee73945e369a862480ef4ac64e55c797bd7d90
    
    ## What changes were proposed in this pull request?
    The optimizer tries to remove redundant alias only projections from the 
query plan using the `RemoveAliasOnlyProject` rule. The current rule identifies 
removes such a project and rewrites the project's attributes in the **entire** 
tree. This causes problems when parts of the tree are duplicated (for instance 
a self join on a temporary view/CTE)  and the duplicated part contains the 
alias only project, in this case the rewrite will break the tree.
    
    This PR fixes these problems by using a blacklist for attributes that are 
not to be moved, and by making sure that attribute remapping is only done for 
the parent tree, and not for unrelated parts of the query plan.
    
    The current tree transformation infrastructure works very well if the 
transformation at hand requires little or a global contextual information. In 
this case we need to know both the attributes that were not to be moved, and we 
also needed to know which child attributes were modified. This cannot be done 
easily using the current infrastructure, and solutions typically involves 
transversing the query plan multiple times (which is super slow). I have moved 
around some code in `TreeNode`, `QueryPlan` and `LogicalPlan`to make this much 
more straightforward; this basically allows you to manually traverse the tree.
    
    ## How was this patch tested?
    I have added unit tests to `RemoveRedundantAliasAndProjectSuite` and I have 
added integration tests to the `SQLQueryTestSuite.union` and 
`SQLQueryTestSuite.cte` test cases.
    
    Author: Herman van Hovell <hvanhov...@databricks.com>
    
    Closes #16843 from hvanhovell/SPARK-18609-2.1.

commit 502c927b8c8a99ef2adf4e6e1d7a6d9232d45ef5
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-02-08T19:33:59Z

    [SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations for 
branch-2.1
    
    This is a follow up PR for merging #16758 to spark 2.1 branch
    
    ## What changes were proposed in this pull request?
    
    `mapGroupsWithState` is a new API for arbitrary stateful operations in 
Structured Streaming, similar to `DStream.mapWithState`
    
    *Requirements*
    - Users should be able to specify a function that can do the following
    - Access the input row corresponding to a key
    - Access the previous state corresponding to a key
    - Optionally, update or remove the state
    - Output any number of new rows (or none at all)
    
    *Proposed API*
    ```
    // ------------ New methods on KeyValueGroupedDataset ------------
    class KeyValueGroupedDataset[K, V] {
        // Scala friendly
        def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
KeyedState[S]) => U)
            def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
Iterator[V], KeyedState[S]) => Iterator[U])
        // Java friendly
           def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, 
S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
           def flatMapGroupsWithState[S, U](func: 
FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
resultEncoder: Encoder[U])
    }
    
    // ------------------- New Java-friendly function classes 
-------------------
    public interface MapGroupsWithStateFunction<K, V, S, R> extends 
Serializable {
      R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
    }
    public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends 
Serializable {
      Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws 
Exception;
    }
    
    // ---------------------- Wrapper class for state data 
----------------------
    trait KeyedState[S] {
        def exists(): Boolean
        def get(): S                    // throws Exception is state does not 
exist
        def getOption(): Option[S]
        def update(newState: S): Unit
        def remove(): Unit              // exists() will be false after this
    }
    ```
    
    Key Semantics of the State class
    - The state can be null.
    - If the state.remove() is called, then state.exists() will return false, 
and getOption will returm None.
    - After that state.update(newState) is called, then state.exists() will 
return true, and getOption will return Some(...).
    - None of the operations are thread-safe. This is to avoid memory barriers.
    
    *Usage*
    ```
    val stateFunc = (word: String, words: Iterator[String, runningCount: 
KeyedState[Long]) => {
        val newCount = words.size + runningCount.getOption.getOrElse(0L)
        runningCount.update(newCount)
       (word, newCount)
    }
    
    dataset                                                             // type 
is Dataset[String]
      .groupByKey[String](w => w)                               // generates 
KeyValueGroupedDataset[String, String]
      .mapGroupsWithState[Long, (String, Long)](stateFunc)      // returns 
Dataset[(String, Long)]
    ```
    
    ## How was this patch tested?
    New unit tests.
    
    Author: Tathagata Das <tathagata.das1...@gmail.com>
    
    Closes #16850 from tdas/mapWithState-branch-2.1.

commit b3fd36a15a0924b9de88dadc6e0acbe504ba4b96
Author: Shixiong Zhu <shixi...@databricks.com>
Date:   2017-02-09T19:16:51Z

    [SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in 
Signaling.cancelOnInterrupt
    
    ## What changes were proposed in this pull request?
    
    `Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes 
ReplSuite unstable.
    
    This PR adds `SparkContext.getActive` to allow 
`Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the 
leak.
    
    ## How was this patch tested?
    
    Jenkins
    
    Author: Shixiong Zhu <shixi...@databricks.com>
    
    Closes #16825 from zsxwing/SPARK-19481.
    
    (cherry picked from commit 303f00a4bf6660dd83c8bd9e3a107bb3438a421b)
    Signed-off-by: Davies Liu <davies....@gmail.com>

commit a3d5300a030fb5f1c275e671603e0745b6466735
Author: Stan Zhai <m...@zhaishidan.cn>
Date:   2017-02-09T20:01:25Z

    [SPARK-19509][SQL] Grouping Sets do not respect nullable grouping columns
    
    ## What changes were proposed in this pull request?
    The analyzer currently does not check if a column used in grouping sets is 
actually nullable itself. This can cause the nullability of the column to be 
incorrect, which can cause null pointer exceptions down the line. This PR fixes 
that by also consider the nullability of the column.
    
    This is only a problem for Spark 2.1 and below. The latest master uses a 
different approach.
    
    Closes https://github.com/apache/spark/pull/16874
    
    ## How was this patch tested?
    Added a regression test to `SQLQueryTestSuite.grouping_set`.
    
    Author: Herman van Hovell <hvanhov...@databricks.com>
    
    Closes #16873 from hvanhovell/SPARK-19509.

commit ff5818b8cee7c718ef5bdef125c8d6971d64acde
Author: Bogdan Raducanu <bog...@databricks.com>
Date:   2017-02-10T09:50:07Z

    [SPARK-19512][BACKPORT-2.1][SQL] codegen for compare structs fails #16852
    
    ## What changes were proposed in this pull request?
    
    Set currentVars to null in GenerateOrdering.genComparisons before genCode 
is called. genCode ignores INPUT_ROW if currentVars is not null and in 
genComparisons we want it to use INPUT_ROW.
    
    ## How was this patch tested?
    
    Added test with 2 queries in WholeStageCodegenSuite
    
    Author: Bogdan Raducanu <bogdan....@gmail.com>
    
    Closes #16875 from bogdanrdc/SPARK-19512-2.1.

commit 7b5ea000e246f7052e7324fd7f2e99f32aaece17
Author: Burak Yavuz <brk...@gmail.com>
Date:   2017-02-10T11:55:06Z

    [SPARK-19543] from_json fails when the input row is empty
    
    ## What changes were proposed in this pull request?
    
    Using from_json on a column with an empty string results in: 
java.util.NoSuchElementException: head of empty list.
    
    This is because `parser.parse(input)` may return `Nil` when 
`input.trim.isEmpty`
    
    ## How was this patch tested?
    
    Regression test in `JsonExpressionsSuite`
    
    Author: Burak Yavuz <brk...@gmail.com>
    
    Closes #16881 from brkyvz/json-fix.
    
    (cherry picked from commit d5593f7f5794bd0343e783ac4957864fed9d1b38)
    Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>

commit e580bb035236dd92ade126af6bb98288d88179c4
Author: Andrew Ray <ray.and...@gmail.com>
Date:   2016-12-13T07:49:22Z

    [SPARK-18717][SQL] Make code generation for Scala Map work with 
immutable.Map also
    
    ## What changes were proposed in this pull request?
    
    Fixes compile errors in generated code when user has case class with a 
`scala.collections.immutable.Map` instead of a `scala.collections.Map`. Since 
ArrayBasedMapData.toScalaMap returns the immutable version we can make it work 
with both.
    
    ## How was this patch tested?
    
    Additional unit tests.
    
    Author: Andrew Ray <ray.and...@gmail.com>
    
    Closes #16161 from aray/fix-map-codegen.
    
    (cherry picked from commit 46d30ac4846b3ec94426cc482c42cff72ebd6d92)
    Signed-off-by: Cheng Lian <l...@databricks.com>

commit 173c2387a38b260b46d7646b332e404f6ebe1a17
Author: titicaca <fangzhou.y...@hotmail.com>
Date:   2017-02-12T18:42:15Z

    [SPARK-19342][SPARKR] bug fixed in collect method for collecting timestamp 
column
    
    ## What changes were proposed in this pull request?
    
    Fix a bug in collect method for collecting timestamp column, the bug can be 
reproduced as shown in the following codes and outputs:
    
    ```
    library(SparkR)
    sparkR.session(master = "local")
    df <- data.frame(col1 = c(0, 1, 2),
                     col2 = c(as.POSIXct("2017-01-01 00:00:01"), NA, 
as.POSIXct("2017-01-01 12:00:01")))
    
    sdf1 <- createDataFrame(df)
    print(dtypes(sdf1))
    df1 <- collect(sdf1)
    print(lapply(df1, class))
    
    sdf2 <- filter(sdf1, "col1 > 0")
    print(dtypes(sdf2))
    df2 <- collect(sdf2)
    print(lapply(df2, class))
    ```
    
    As we can see from the printed output, the column type of col2 in df2 is 
converted to numeric unexpectedly, when NA exists at the top of the column.
    
    This is caused by method `do.call(c, list)`, if we convert a list, i.e. 
`do.call(c, list(NA, as.POSIXct("2017-01-01 12:00:01"))`, the class of the 
result is numeric instead of POSIXct.
    
    Therefore, we need to cast the data type of the vector explicitly.
    
    ## How was this patch tested?
    
    The patch can be tested manually with the same code above.
    
    Author: titicaca <fangzhou.y...@hotmail.com>
    
    Closes #16689 from titicaca/sparkr-dev.
    
    (cherry picked from commit bc0a0e6392c4e729d8f0e4caffc0bd05adb0d950)
    Signed-off-by: Felix Cheung <felixche...@apache.org>

commit 06e77e0097c6fa0accc5d9d6ce08a65a3828b878
Author: wm...@hotmail.com <wm...@hotmail.com>
Date:   2017-02-12T18:48:55Z

    [SPARK-19319][BACKPORT-2.1][SPARKR] SparkR Kmeans summary returns error 
when the cluster size doesn't equal to k
    
    ## What changes were proposed in this pull request?
    
    Backport fix of #16666
    
    ## How was this patch tested?
    
    Backport unit tests
    
    Author: wm...@hotmail.com <wm...@hotmail.com>
    
    Closes #16761 from wangmiao1981/kmeansport.

commit fe4fcc5701cbd3f2e698e00f1cc7d49d5c7c702b
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-02-13T07:00:22Z

    [SPARK-19564][SPARK-19559][SS][KAFKA] KafkaOffsetReader's consumers should 
not be in the same group
    
    ## What changes were proposed in this pull request?
    
    In `KafkaOffsetReader`, when error occurs, we abort the existing consumer 
and create a new consumer. In our current implementation, the first consumer 
and the second consumer would be in the same group (which leads to 
SPARK-19559), **_violating our intention of the two consumers not being in the 
same group._**
    
    The cause is that, in our current implementation, the first consumer is 
created before `groupId` and `nextId` are initialized in the constructor. Then 
even if `groupId` and `nextId` are increased during the creation of that first 
consumer, `groupId` and `nextId` would still be initialized to default values 
in the constructor for the second consumer.
    
    We should make sure that `groupId` and `nextId` are initialized before any 
consumer is created.
    
    ## How was this patch tested?
    
    Ran 100 times of `KafkaSourceSuite`; all passed
    
    Author: Liwei Lin <lwl...@gmail.com>
    
    Closes #16902 from lw-lin/SPARK-19564-.
    
    (cherry picked from commit 2bdbc87052389ff69404347fbc69457132dbcafd)
    Signed-off-by: Shixiong Zhu <shixi...@databricks.com>

commit a3b6751375cf301dec156b85fe79e32b0797a24f
Author: Xiao Li <gatorsm...@gmail.com>
Date:   2017-02-13T11:18:31Z

    [SPARK-19574][ML][DOCUMENTATION] Fix Liquid Exception: Start indices amount 
is not equal to end indices amount
    
    ### What changes were proposed in this pull request?
    ```
    Liquid Exception: Start indices amount is not equal to end indices amount, 
see 
/Users/xiao/IdeaProjects/sparkDelivery/docs/../examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java.
 in ml-features.md
    ```
    
    So far, the build is broken after merging 
https://github.com/apache/spark/pull/16789
    
    This PR is to fix it.
    
    ## How was this patch tested?
    Manual
    
    Author: Xiao Li <gatorsm...@gmail.com>
    
    Closes #16908 from gatorsmile/docMLFix.
    
    (cherry picked from commit 855a1b7551c71b26ce7d9310342fefe0a87281ec)
    Signed-off-by: Sean Owen <so...@cloudera.com>

commit ef4fb7ebca963eb95d6a8bf7543e05aa375edc23
Author: zero323 <zero...@users.noreply.github.com>
Date:   2017-02-13T17:26:49Z

    [SPARK-19506][ML][PYTHON] Import warnings in pyspark.ml.util
    
    ## What changes were proposed in this pull request?
    
    Add missing `warnings` import.
    
    ## How was this patch tested?
    
    Manual tests.
    
    Author: zero323 <zero...@users.noreply.github.com>
    
    Closes #16846 from zero323/SPARK-19506.
    
    (cherry picked from commit 5e7cd3322b04f1dd207829b70546bc7ffdd63363)
    Signed-off-by: Holden Karau <hol...@us.ibm.com>

commit c5a7cb0225ed4ed0d1ede5da0593b258c5dfd79f
Author: Shixiong Zhu <shixi...@databricks.com>
Date:   2017-02-13T19:54:54Z

    [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without 
errors
    
    ## What changes were proposed in this pull request?
    
    When a query uses a temp checkpoint dir, it's better to delete it if it's 
stopped without errors.
    
    ## How was this patch tested?
    
    New unit tests.
    
    Author: Shixiong Zhu <shixi...@databricks.com>
    
    Closes #16880 from zsxwing/delete-temp-checkpoint.
    
    (cherry picked from commit 3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529)
    Signed-off-by: Burak Yavuz <brk...@gmail.com>

commit 328b229840d6e87c7faf7ee3cd5bf66a905c9a7d
Author: Shixiong Zhu <shixi...@databricks.com>
Date:   2017-02-13T20:03:36Z

    [SPARK-17714][CORE][TEST-MAVEN][TEST-HADOOP2.6] Avoid using 
ExecutorClassLoader to load Netty generated classes
    
    ## What changes were proposed in this pull request?
    
    Netty's `MessageToMessageEncoder` uses 
[Javassist](https://github.com/netty/netty/blob/91a0bdc17a8298437d6de08a8958d753799bd4a6/common/src/main/java/io/netty/util/internal/JavassistTypeParameterMatcherGenerator.java#L62)
 to generate a matcher class and the implementation calls `Class.forName` to 
check if this class is already generated. If `MessageEncoder` or 
`MessageDecoder` is created in `ExecutorClassLoader.findClass`, it will cause 
`ClassCircularityError`. This is because loading this Netty generated class 
will call `ExecutorClassLoader.findClass` to search this class, and 
`ExecutorClassLoader` will try to use RPC to load it and cause to load the 
non-exist matcher class again. JVM will report `ClassCircularityError` to 
prevent such infinite recursion.
    
    ##### Why it only happens in Maven builds
    
    It's because Maven and SBT have different class loader tree. The Maven 
build will set a URLClassLoader as the current context class loader to run the 
tests and expose this issue. The class loader tree is as following:
    
    ```
    bootstrap class loader ------ ... ----- REPL class loader ---- 
ExecutorClassLoader
    |
    |
    URLClasssLoader
    ```
    
    The SBT build uses the bootstrap class loader directly and 
`ReplSuite.test("propagation of local properties")` is the first test in 
ReplSuite, which happens to load 
`io/netty/util/internal/__matchers__/org/apache/spark/network/protocol/MessageMatcher`
 into the bootstrap class loader (Note: in maven build, it's loaded into 
URLClasssLoader so it cannot be found in ExecutorClassLoader). This issue can 
be reproduced in SBT as well. Here are the produce steps:
    - Enable `hadoop.caller.context.enabled`.
    - Replace `Class.forName` with `Utils.classForName` in `object 
CallerContext`.
    - Ignore `ReplSuite.test("propagation of local properties")`.
    - Run `ReplSuite` using SBT.
    
    This PR just creates a singleton MessageEncoder and MessageDecoder and 
makes sure they are created before switching to ExecutorClassLoader. 
TransportContext will be created when creating RpcEnv and that happens before 
creating ExecutorClassLoader.
    
    ## How was this patch tested?
    
    Jenkins
    
    Author: Shixiong Zhu <shixi...@databricks.com>
    
    Closes #16859 from zsxwing/SPARK-17714.
    
    (cherry picked from commit 905fdf0c243e1776c54c01a25b17878361400225)
    Signed-off-by: Shixiong Zhu <shixi...@databricks.com>

commit 2968d8c0666801fb6a363dfca3c5a85ee8a1cc0c
Author: Shixiong Zhu <shixi...@databricks.com>
Date:   2017-02-13T20:35:56Z

    [HOTFIX][SPARK-19542][SS]Fix the missing import in 
DataStreamReaderWriterSuite

commit 5db23473008a58fb9a7f77ad8b01bcdc2c5f2d9c
Author: Josh Rosen <joshro...@databricks.com>
Date:   2017-02-13T19:04:27Z

    [SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()
    
    This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's `network-common` library in order to fix a bug which 
may cause tasks to be uncancellable.
    
    In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.
    
    As an example of the impact of this problem, I experienced significant 
numbers of uncancellable "zombie tasks" on a production cluster where several 
tasks were blocked trying to connect to a dead shuffle server and then 
continued running as zombies after I cancelled the associated Spark stage. The 
zombie tasks ran for several minutes with the following stack:
    
    ```
    java.lang.Object.wait(Native Method)
    java.lang.Object.wait(Object.java:460)
    io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
    
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
    
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
    
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object1849476028})
    
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
    
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
    
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
    
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
    
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
    
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
    350)
    
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
    
org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
    
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
    
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    [...]
    ```
    
    As far as I can tell, `awaitUninterruptibly()` might have been used in 
order to avoid having to declare that methods throw `InterruptedException` 
(this code is written in Java, hence the need to use checked exceptions). This 
patch simply replaces this with a regular, interruptible `await()` call,.
    
    This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).
    
    An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.
    
    Note that there are three other `awaitUninterruptibly()` in the codebase, 
but those calls have a hard 10 second timeout and are waiting on a `close()` 
operation which is expected to complete near instantaneously, so the impact of 
uninterruptibility there is much smaller.
    
    Manually.
    
    Author: Josh Rosen <joshro...@databricks.com>
    
    Closes #16866 from JoshRosen/SPARK-19529.
    
    (cherry picked from commit 1c4d10b10c78d138b55e381ec6828e04fef70d6f)
    Signed-off-by: Cheng Lian <l...@databricks.com>

commit 7fe3543fd2cc1cf82135b4208e1391ab3a25f2d9
Author: Marcelo Vanzin <van...@cloudera.com>
Date:   2017-02-13T22:19:41Z

    [SPARK-19520][STREAMING] Do not encrypt data written to the WAL.
    
    Spark's I/O encryption uses an ephemeral key for each driver instance.
    So driver B cannot decrypt data written by driver A since it doesn't
    have the correct key.
    
    The write ahead log is used for recovery, thus needs to be readable by
    a different driver. So it cannot be encrypted by Spark's I/O encryption
    code.
    
    The BlockManager APIs used by the WAL code to write the data automatically
    encrypt data, so changes are needed so that callers can to opt out of
    encryption.
    
    Aside from that, the "putBytes" API in the BlockManager does not do
    encryption, so a separate situation arised where the WAL would write
    unencrypted data to the BM and, when those blocks were read, decryption
    would fail. So the WAL code needs to ask the BM to encrypt that data
    when encryption is enabled; this code is not optimal since it results
    in a (temporary) second copy of the data block in memory, but should be
    OK for now until a more performant solution is added. The non-encryption
    case should not be affected.
    
    Tested with new unit tests, and by running streaming apps that do
    recovery using the WAL data with I/O encryption turned on.
    
    Author: Marcelo Vanzin <van...@cloudera.com>
    
    Closes #16862 from vanzin/SPARK-19520.
    
    (cherry picked from commit 0169360ef58891ca10a8d64d1c8637c7b873cbdd)
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>

commit c8113b0ee0555efe72827a91246af2737d1d4993
Author: Sunitha Kambhampati <skam...@us.ibm.com>
Date:   2017-02-14T06:49:29Z

    [SPARK-19585][DOC][SQL] Fix the cacheTable and uncacheTable api call in the 
doc
    
    ## What changes were proposed in this pull request?
    
    
https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory
    In the doc, the call spark.cacheTable(“tableName”) and 
spark.uncacheTable(“tableName”) actually needs to be 
spark.catalog.cacheTable and spark.catalog.uncacheTable
    
    ## How was this patch tested?
    Built the docs and verified the change shows up fine.
    
    Author: Sunitha Kambhampati <skam...@us.ibm.com>
    
    Closes #16919 from skambha/docChange.
    
    (cherry picked from commit 9b5e460a9168ab78607034434ca45ab6cb51e5a6)
    Signed-off-by: Xiao Li <gatorsm...@gmail.com>

commit f837ced4c448e918efa7bfc49becfa09a50f5147
Author: Jong Wook Kim <jongw...@nyu.edu>
Date:   2017-02-14T19:33:31Z

    [SPARK-19501][YARN] Reduce the number of HDFS RPCs during YARN deployment
    
    ## What changes were proposed in this pull request?
    
    As discussed in [JIRA](https://issues.apache.org/jira/browse/SPARK-19501), 
this patch addresses the problem where too many HDFS RPCs are made when there 
are many URIs specified in `spark.yarn.jars`, potentially adding hundreds of 
RTTs to YARN before the application launches. This becomes significant when 
submitting the application to a non-local YARN cluster (where the RTT may be in 
order of 100ms, for example). For each URI specified, the current 
implementation makes at least two HDFS RPCs, for:
    
    - [Calling `getFileStatus()` before uploading each file to the distributed 
cache in 
`ClientDistributedCacheManager.addResource()`](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71).
    - [Resolving any symbolic links in each of the file 
URI](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L377-L379),
 which repeatedly makes HDFS RPCs until the all symlinks are resolved. (see 
[`FileContext.resolve(Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java#L2189-L2195),
 [`FSLinkResolver.resolve(FileContext, 
Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSLinkResolver.java#L79-L112),
 and 
[`AbstractFileSystem.resolvePath()`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java#L464-L468).)
    
    The first `getFileStatus` RPC can be removed, using `statCache` populated 
with the file statuses retrieved with [the previous `globStatus` 
call](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L531).
    
    The second one can be largely reduced by caching the symlink resolution 
results in a mutable.HashMap. This patch adds a local variable in 
`yarn.Client.prepareLocalResources()` and passes it as an additional parameter 
to `yarn.Client.copyFileToRemote`.  [The symlink resolution code was added in 
2013](https://github.com/apache/spark/commit/a35472e1dd2ea1b5a0b1fb6b382f5a98f5aeba5a#diff-b050df3f55b82065803d6e83453b9706R187)
 and has not changed since. I am assuming that this is still required, but 
otherwise we can remove using `symlinkCache` and symlink resolution altogether.
    
    ## How was this patch tested?
    
    This patch is based off 8e8afb3, currently the latest YARN patch on master. 
All tests except a few in spark-hive passed with `./dev/run-tests` on my 
machine, using JDK 1.8.0_112 on macOS 10.12.3; also tested myself with this 
modified version of SPARK 2.2.0-SNAPSHOT which performed a normal deployment 
and execution on a YARN cluster without errors.
    
    Author: Jong Wook Kim <jongw...@nyu.edu>
    
    Closes #16916 from jongwook/SPARK-19501.
    
    (cherry picked from commit ab9872db1f9c0f289541ec5756d1a142d85545ce)
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>

commit 7763b0b8bd33b0baa99434136528efb5de261919
Author: Felix Cheung <felixcheun...@hotmail.com>
Date:   2017-02-14T21:51:27Z

    [SPARK-19387][SPARKR] Tests do not run with SparkR source package in CRAN 
check
    
    ## What changes were proposed in this pull request?
    
    - this is cause by changes in SPARK-18444, SPARK-18643 that we no longer 
install Spark when `master = ""` (default), but also related to SPARK-18449 
since the real `master` value is not known at the time the R code in 
`sparkR.session` is run. (`master` cannot default to "local" since it could be 
overridden by spark-submit commandline or spark config)
    - as a result, while running SparkR as a package in IDE is working fine, 
CRAN check is not as it is launching it via non-interactive script
    - fix is to add check to the beginning of each test and vignettes; the same 
would also work by changing `sparkR.session()` to `sparkR.session(master = 
"local")` in tests, but I think being more explicit is better.
    
    ## How was this patch tested?
    
    Tested this by reverting version to 2.1, since it needs to download the 
release jar with matching version. But since there are changes in 2.2 
(specifically around SparkR ML) that are incompatible with 2.1, some tests are 
failing in this config. Will need to port this to branch-2.1 and retest with 
2.1 release jar.
    
    manually as:
    ```
    # modify DESCRIPTION to revert version to 2.1.0
    SPARK_HOME=/usr/spark R CMD build pkg
    # run cran check without SPARK_HOME
    R CMD check --as-cran SparkR_2.1.0.tar.gz
    ```
    
    Author: Felix Cheung <felixcheun...@hotmail.com>
    
    Closes #16720 from felixcheung/rcranchecktest.
    
    (cherry picked from commit a3626ca333e6e1881e2f09ccae0fa8fa7243223e)
    Signed-off-by: Shivaram Venkataraman <shiva...@cs.berkeley.edu>

commit 8ee4ec8121aa47c34ea153a6f47ef5f04004da9a
Author: Tyson Condie <tcon...@gmail.com>
Date:   2017-02-15T02:50:14Z

    [SPARK-19584][SS][DOCS] update structured streaming documentation around 
batch mode
    
    ## What changes were proposed in this pull request?
    
    Revision to structured-streaming-kafka-integration.md to reflect new Batch 
query specification and options.
    
    zsxwing tdas
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Author: Tyson Condie <tcon...@gmail.com>
    
    Closes #16918 from tcondie/kafka-docs.
    
    (cherry picked from commit 447b2b5309251f3ae37857de73c157e59a0d76df)
    Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>

commit 6c35399068f1035fec6d5f909a83a5b1683702e0
Author: Felix Cheung <felixcheun...@hotmail.com>
Date:   2017-02-15T18:45:37Z

    [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
    
    Add coalesce on DataFrame for down partitioning without shuffle and 
coalesce on Column
    
    manual, unit tests
    
    Author: Felix Cheung <felixcheun...@hotmail.com>
    
    Closes #16739 from felixcheung/rcoalesce.
    
    (cherry picked from commit 671bc08ed502815bfa2254c30d64149402acb0c7)
    Signed-off-by: Felix Cheung <felixche...@apache.org>

commit 88c43f4fb5ea042a119819c11a5cdbe225095c54
Author: Shixiong Zhu <shixi...@databricks.com>
Date:   2017-02-16T00:21:43Z

    [SPARK-19599][SS] Clean up HDFSMetadataLog
    
    ## What changes were proposed in this pull request?
    
    SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some 
cleanup for HDFSMetadataLog.
    
    This PR includes the following changes:
    - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is 
another issue 
[HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that 
prevents us from removing the workaround codes.
    - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call 
`serialize` directly.
    - Remove catching FileNotFoundException.
    
    ## How was this patch tested?
    
    Jenkins
    
    Author: Shixiong Zhu <shixi...@databricks.com>
    
    Closes #16932 from zsxwing/metadata-cleanup.
    
    (cherry picked from commit 21b4ba2d6f21a9759af879471715c123073bd67a)
    Signed-off-by: Shixiong Zhu <shixi...@databricks.com>

commit b9ab4c0e983df463232f1adbe6e5982b0d7d497d
Author: Yin Huai <yh...@databricks.com>
Date:   2017-02-15T22:41:15Z

    [SPARK-19604][TESTS] Log the start of every Python test
    
    ## What changes were proposed in this pull request?
    Right now, we only have info level log after we finish the tests of a 
Python test file. We should also log the start of a test. So, if a test is 
hanging, we can tell which test file is running.
    
    ## How was this patch tested?
    This is a change for python tests.
    
    Author: Yin Huai <yh...@databricks.com>
    
    Closes #16935 from yhuai/SPARK-19604.
    
    (cherry picked from commit f6c3bba22501ee7753d85c6e51ffe851d43869c1)
    Signed-off-by: Yin Huai <yh...@databricks.com>

commit db7adb61bebb5e9a74f2e3f8eba481615ff8c31a
Author: Shixiong Zhu <shixi...@databricks.com>
Date:   2017-02-16T04:51:33Z

    [SPARK-19603][SS] Fix StreamingQuery explain command
    
    ## What changes were proposed in this pull request?
    
    `StreamingQuery.explain` doesn't show the correct streaming physical plan 
right now because `ExplainCommand` receives a runtime batch plan and its 
`logicalPlan.isStreaming` is always false.
    
    This PR adds `streaming` parameter to `ExplainCommand` to allow 
`StreamExecution` to specify that it's a streaming plan.
    
    Examples of the explain outputs:
    
    - streaming DataFrame.explain()
    ```
    == Physical Plan ==
    *HashAggregate(keys=[value#518], functions=[count(1)])
    +- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0
       +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
          +- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0)
             +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
                +- Exchange hashpartitioning(value#518, 5)
                   +- *HashAggregate(keys=[value#518], 
functions=[partial_count(1)])
                      +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true) AS value#518]
                         +- *MapElements <function1>, obj#517: java.lang.String
                            +- *DeserializeToObject value#513.toString, 
obj#516: java.lang.String
                               +- StreamingRelation MemoryStream[value#513], 
[value#513]
    ```
    
    - StreamingQuery.explain(extended = false)
    ```
    == Physical Plan ==
    *HashAggregate(keys=[value#518], functions=[count(1)])
    +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
       +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
          +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
             +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
                +- Exchange hashpartitioning(value#518, 5)
                   +- *HashAggregate(keys=[value#518], 
functions=[partial_count(1)])
                      +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true) AS value#518]
                         +- *MapElements <function1>, obj#517: java.lang.String
                            +- *DeserializeToObject value#543.toString, 
obj#516: java.lang.String
                               +- LocalTableScan [value#543]
    ```
    
    - StreamingQuery.explain(extended = true)
    ```
    == Parsed Logical Plan ==
    Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
    +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true) AS value#518]
       +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#517: java.lang.String
          +- DeserializeToObject cast(value#543 as string).toString, obj#516: 
java.lang.String
             +- LocalRelation [value#543]
    
    == Analyzed Logical Plan ==
    value: string, count(1): bigint
    Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
    +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true) AS value#518]
       +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#517: java.lang.String
          +- DeserializeToObject cast(value#543 as string).toString, obj#516: 
java.lang.String
             +- LocalRelation [value#543]
    
    == Optimized Logical Plan ==
    Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
    +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true) AS value#518]
       +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#517: java.lang.String
          +- DeserializeToObject value#543.toString, obj#516: java.lang.String
             +- LocalRelation [value#543]
    
    == Physical Plan ==
    *HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, 
count(1)#524L])
    +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
       +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], 
output=[value#518, count#530L])
          +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
             +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], 
output=[value#518, count#530L])
                +- Exchange hashpartitioning(value#518, 5)
                   +- *HashAggregate(keys=[value#518], 
functions=[partial_count(1)], output=[value#518, count#530L])
                      +- *SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true) AS value#518]
                         +- *MapElements <function1>, obj#517: java.lang.String
                            +- *DeserializeToObject value#543.toString, 
obj#516: java.lang.String
                               +- LocalTableScan [value#543]
    ```
    
    ## How was this patch tested?
    
    The updated unit test.
    
    Author: Shixiong Zhu <shixi...@databricks.com>
    
    Closes #16934 from zsxwing/SPARK-19603.
    
    (cherry picked from commit fc02ef95cdfc226603b52dc579b7133631f7143d)
    Signed-off-by: Shixiong Zhu <shixi...@databricks.com>

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to