[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+

2018-05-29 Thread dbtsai
Github user dbtsai commented on the issue:

https://github.com/apache/spark/pull/21459
  
@felixcheung Yes. All tests are passed with JDK8.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21452
  
**[Test build #91286 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91286/testReport)**
 for PR 21452 at commit 
[`9881d9c`](https://github.com/apache/spark/commit/9881d9c6a2b1d56e69bb06ee27fd8706f6e0fe43).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala ...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21458
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3692/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21246
  
**[Test build #91287 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91287/testReport)**
 for PR 21246 at commit 
[`6fd8f2f`](https://github.com/apache/spark/commit/6fd8f2fbd37e5193f0ffb1a25a8f4a8c71ab55bd).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala ...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21458
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/21246
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...

2018-05-29 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/21452
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21459
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3691/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21459
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21378: [SPARK-24326][Mesos] add support for local:// sch...

2018-05-29 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21378#discussion_r191647592
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -418,17 +417,33 @@ private[spark] class MesosClusterScheduler(
 envBuilder.build()
   }
 
+  private def isContainerLocalAppJar(desc: MesosDriverDescription): 
Boolean = {
+val isLocalJar = desc.jarUrl.startsWith("local://")
+val isContainerLocal = 
desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists {
+  case "container" => true
+  case "host" => false
+  case other =>
+logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode 
$other, using host.")
+false
+  }
--- End diff --

It's not very commonly used, but ok to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21459
  
**[Test build #91285 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91285/testReport)**
 for PR 21459 at commit 
[`bec3e81`](https://github.com/apache/spark/commit/bec3e81a3522b54692150584c86d1925799c08da).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to suppor...

2018-05-29 Thread dbtsai
GitHub user dbtsai opened a pull request:

https://github.com/apache/spark/pull/21459

[SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+

## What changes were proposed in this pull request?

Upgrade ASM to 6.1 to support JDK9+

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dbtsai/spark asm

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21459.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21459


commit bec3e81a3522b54692150584c86d1925799c08da
Author: DB Tsai 
Date:   2018-05-29T22:35:55Z

Asm6.1




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala ...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21458
  
**[Test build #91284 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91284/testReport)**
 for PR 21458 at commit 
[`48c57e0`](https://github.com/apache/spark/commit/48c57e09dda63259230aa81facb31c1795f602fe).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with...

2018-05-29 Thread dbtsai
GitHub user dbtsai opened a pull request:

https://github.com/apache/spark/pull/21458

[SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+

## What changes were proposed in this pull request?

Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+

## How was this patch tested?

Existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dbtsai/spark sbt

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21458.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21458


commit 48c57e09dda63259230aa81facb31c1795f602fe
Author: DB Tsai 
Date:   2018-05-29T23:54:22Z

upgrade sbt




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21379: [SPARK-24327][SQL] Add an option to quote a partition co...

2018-05-29 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/21379
  
You mean we need verification code there for checking if the fetched schema 
has a user-given partition column? If the schema does not have the column, 
throws `AnalysisException` or something?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21379: [SPARK-24327][SQL] Add an option to quote a partition co...

2018-05-29 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21379
  
Sorry, I just realized this is a wrong direction. Instead of trusting the 
user inputs, we should verify the user-specified partition columns by using the 
already fetched table schema info `val tableSchema = 
JDBCRDD.resolveTable(jdbcOptions)` when building `JDBCRelation`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21379: [SPARK-24327][SQL] Add an option to quote a parti...

2018-05-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21379#discussion_r191641304
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 ---
@@ -78,7 +79,12 @@ private[sql] object JDBCRelation extends Logging {
 // Overflow and silliness can happen if you subtract then divide.
 // Here we get a little roundoff, but that's (hopefully) OK.
 val stride: Long = upperBound / numPartitions - lowerBound / 
numPartitions
-val column = partitioning.column
+val column = if (jdbcOptions.quotePartitionColumnName) {
+  val dialect = JdbcDialects.get(jdbcOptions.url)
+  dialect.quoteIdentifier(partitioning.column)
--- End diff --

If possible, please do not break the existing behavior.  Both should work 
without specifying the extra option:
```
val df = spark.read.format("jdbc")
  .option("url", urlWithUserAndPass)
  .option("dbtable", "TEST.PEOPLE")
  .option("partitionColumn", "nonQuotedPrtColName")
  .option("lowerBound", 1)
  .option("upperBound", 4)
  .option("numPartitions", 3)
  .load()
```
```
val df = spark.read.format("jdbc")
  .option("url", urlWithUserAndPass)
  .option("dbtable", "TEST.PEOPLE")
  .option("partitionColumn", nonQuotedPrtColName)
  .option("lowerBound", 1)
  .option("upperBound", 4)
  .option("numPartitions", 3)
  .load()
```

Is that possible?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91281/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21366
  
**[Test build #91281 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91281/testReport)**
 for PR 21366 at commit 
[`5b9c00f`](https://github.com/apache/spark/commit/5b9c00fa39d1c83435ca65de5394345e5d6f1f00).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21246
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21246
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91283/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21246
  
**[Test build #91283 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91283/testReport)**
 for PR 21246 at commit 
[`6fd8f2f`](https://github.com/apache/spark/commit/6fd8f2fbd37e5193f0ffb1a25a8f4a8c71ab55bd).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21454: [SPARK-24337][Core] Improve error messages for Spark con...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21454
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21454: [SPARK-24337][Core] Improve error messages for Spark con...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21454
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91279/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21454: [SPARK-24337][Core] Improve error messages for Spark con...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21454
  
**[Test build #91279 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91279/testReport)**
 for PR 21454 at commit 
[`badbf0e`](https://github.com/apache/spark/commit/badbf0e6766a99565e061063041f231d119d6d3a).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21378: [SPARK-24326][Mesos] add support for local:// sch...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21378#discussion_r191630562
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -418,17 +417,33 @@ private[spark] class MesosClusterScheduler(
 envBuilder.build()
   }
 
+  private def isContainerLocalAppJar(desc: MesosDriverDescription): 
Boolean = {
+val isLocalJar = desc.jarUrl.startsWith("local://")
+val isContainerLocal = 
desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists {
+  case "container" => true
+  case "host" => false
+  case other =>
+logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode 
$other, using host.")
+false
+  }
--- End diff --

Nothing wrong but I just find it hard to read. I assume @felixcheung had a 
similar concern at the core.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21426: [SPARK-24384][PYTHON][SPARK SUBMIT] Add .py files correc...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21426
  
@vanzin, for 
https://github.com/apache/spark/pull/21426#discussion_r191010819, mind if we 
proceed in a separate ticket? From my look, it needs some changes to verify 
this to address this comment. I think we can't simply raise an exception since 
we can't recognise if that file is downloaded or not in `deploy.PythonRunner`'s 
perspective. 

The most appropriate place seems to be in `SparkSubmit` and  
`DependencyUtils.downloadFile`. seems we should inject some codes in 
`DependencyUtils.downloadFile` since that's where we know the original path and 
where we download the file into local when needed, and I would like to avoid 
add such changes here. It probably needs another review iteration and the 
current change doesn't actually target or change the previous behaviour, really.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191629554
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest {
 super.afterEach()
   }
 
-  test("receiver stopped with row last") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
-val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
-send(
-  endpoint,
-  ReceiverEpochMarker(0),
-  ReceiverRow(0, unsafeRow(111))
-)
+  private implicit def unsafeRow(value: Int) = {
--- End diff --

And where it leverages the `implicit` attribute of this method? I'm not 
sure it is really needed, but I'm review on Github page so I might be missing 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191605388
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest {
 super.afterEach()
   }
 
-  test("receiver stopped with row last") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
-val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
-send(
-  endpoint,
-  ReceiverEpochMarker(0),
-  ReceiverRow(0, unsafeRow(111))
-)
+  private implicit def unsafeRow(value: Int) = {
--- End diff --

Just curious: is there a reason to rearrange functions, this and below 
twos? Looks like they're same except changing this function to `implicit`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191629272
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -288,4 +267,153 @@ class ContinuousShuffleReadSuite extends StreamTest {
 val thirdEpoch = rdd.compute(rdd.partitions(0), 
ctx).map(_.getUTF8String(0).toString).toSet
 assert(thirdEpoch == Set("writer1-row1", "writer2-row0"))
   }
+
+  test("one epoch") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator(1, 2, 3))
+
+assert(readEpoch(reader) == Seq(1, 2, 3))
+  }
+
+  test("multiple epochs") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator(1, 2, 3))
+writer.write(Iterator(4, 5, 6))
+
+assert(readEpoch(reader) == Seq(1, 2, 3))
+assert(readEpoch(reader) == Seq(4, 5, 6))
+  }
+
+  test("empty epochs") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator())
+writer.write(Iterator(1, 2))
+writer.write(Iterator())
+writer.write(Iterator())
+writer.write(Iterator(3, 4))
+writer.write(Iterator())
+
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq(1, 2))
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq(3, 4))
+assert(readEpoch(reader) == Seq())
+  }
+
+  test("blocks waiting for writer") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+val readerEpoch = reader.compute(reader.partitions(0), ctx)
+
+val readRowThread = new Thread {
+  override def run(): Unit = {
+assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1))
+  }
+}
+readRowThread.start()
+
+eventually(timeout(streamingTimeout)) {
+  assert(readRowThread.getState == Thread.State.TIMED_WAITING)
+}
+
+// Once we write the epoch the thread should stop waiting and succeed.
+writer.write(Iterator(1))
+readRowThread.join()
+  }
+
+  test("multiple writer partitions") {
--- End diff --

Would we want to have another test which covers out-of-order epoch between 
writers (if that's valid case for us), or rely on the test in 
ContinuousShuffleReadRDD?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20697: [SPARK-23010][k8s] Initial checkin of k8s integra...

2018-05-29 Thread ssuchter
Github user ssuchter commented on a diff in the pull request:

https://github.com/apache/spark/pull/20697#discussion_r191629581
  
--- Diff: 
resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh
 ---
@@ -0,0 +1,91 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+TEST_ROOT_DIR=$(git rev-parse --show-toplevel)
+UNPACKED_SPARK_TGZ="$TEST_ROOT_DIR/target/spark-dist-unpacked"
+IMAGE_TAG_OUTPUT_FILE="$TEST_ROOT_DIR/target/image-tag.txt"
+DEPLOY_MODE="minikube"
+IMAGE_REPO="docker.io/kubespark"
+IMAGE_TAG="N/A"
+SPARK_TGZ="N/A"
+
+# Parse arguments
+while (( "$#" )); do
+  case $1 in
+--unpacked-spark-tgz)
+  UNPACKED_SPARK_TGZ="$2"
+  shift
+  ;;
+--image-repo)
+  IMAGE_REPO="$2"
+  shift
+  ;;
+--image-tag)
+  IMAGE_TAG="$2"
+  shift
+  ;;
+--image-tag-output-file)
+  IMAGE_TAG_OUTPUT_FILE="$2"
+  shift
+  ;;
+--deploy-mode)
+  DEPLOY_MODE="$2"
+  shift
+  ;;
+--spark-tgz)
+  SPARK_TGZ="$2"
+  shift
+  ;;
+*)
+  break
+  ;;
+  esac
+  shift
+done
+
+if [[ $SPARK_TGZ == "N/A" ]];
+then
+  echo "Must specify a Spark tarball to build Docker images against with 
--spark-tgz." && exit 1;
--- End diff --

I'm sure it's possible. But I'd rather not try to do that refactor. I'd be 
really happy if you wanted to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-29 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r191628993
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

It's not necessary to add a parameter.  Change the message parameter to 
InputStream.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21428
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21428
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91278/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21428
  
**[Test build #91278 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91278/testReport)**
 for PR 21428 at commit 
[`65837ac`](https://github.com/apache/spark/commit/65837ac611991f2db7710d0657e56b222a2f5c74).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21437
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21437
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91277/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21437
  
**[Test build #91277 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91277/testReport)**
 for PR 21437 at commit 
[`2ea9cbc`](https://github.com/apache/spark/commit/2ea9cbc80787f1417fa4502c3c2b9b89f46d0632).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-29 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r191627958
  
--- Diff: python/pyspark/util.py ---
@@ -55,7 +55,9 @@ def _get_argspec(f):
 """
 # `getargspec` is deprecated since python3.0 (incompatible with 
function annotations).
 # See SPARK-23569.
-if sys.version_info[0] < 3:
+if hasattr(f, '_argspec'):
--- End diff --

Sounds good


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21454: [SPARK-24337][Core] Improve error messages for Sp...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21454#discussion_r191626037
  
--- Diff: core/src/test/scala/org/apache/spark/SparkConfSuite.scala ---
@@ -25,14 +25,16 @@ import scala.language.postfixOps
 import scala.util.{Random, Try}
 
 import com.esotericsoftware.kryo.Kryo
+import org.scalatest.Matchers
 
 import org.apache.spark.deploy.history.config._
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, 
KryoSerializer}
 import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
 
-class SparkConfSuite extends SparkFunSuite with LocalSparkContext with 
ResetSystemProperties {
+class SparkConfSuite extends SparkFunSuite with LocalSparkContext with 
ResetSystemProperties with
+  Matchers {
--- End diff --

I would do `with Matchers` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21454: [SPARK-24337][Core] Improve error messages for Sp...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21454#discussion_r191625768
  
--- Diff: core/src/test/scala/org/apache/spark/SparkConfSuite.scala ---
@@ -339,6 +341,38 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
 }
   }
 
+  val defaultIllegalValue = "SomeIllegalValue"
+  val illegalValueTests : Map[String, (SparkConf, String) => Any] = Map(
+"getTimeAsSeconds" -> (_.getTimeAsSeconds(_)),
+"getTimeAsSeconds with default" -> (_.getTimeAsSeconds(_, 
defaultIllegalValue)),
+"getTimeAsMs" -> (_.getTimeAsMs(_)),
+"getTimeAsMs with default" -> (_.getTimeAsMs(_, defaultIllegalValue)),
+"getSizeAsBytes" -> (_.getSizeAsBytes(_)),
+"getSizeAsBytes with default string" -> (_.getSizeAsBytes(_, 
defaultIllegalValue)),
+"getSizeAsBytes with default long" -> (_.getSizeAsBytes(_, 0L)),
+"getSizeAsKb" -> (_.getSizeAsKb(_)),
+"getSizeAsKb with default" -> (_.getSizeAsKb(_, defaultIllegalValue)),
+"getSizeAsMb" -> (_.getSizeAsMb(_)),
+"getSizeAsMb with default" -> (_.getSizeAsMb(_, defaultIllegalValue)),
+"getSizeAsGb" -> (_.getSizeAsGb(_)),
+"getSizeAsGb with default" -> (_.getSizeAsGb(_, defaultIllegalValue)),
+"getInt" -> (_.getInt(_, 0)),
+"getLong" -> (_.getLong(_, 0L)),
+"getDouble" -> (_.getDouble(_, 0.0)),
+"getBoolean" -> (_.getBoolean(_, false))
+  )
+
+  illegalValueTests.foreach { case (name, getValue) =>
+test(s"SPARK-24337: $name throws an useful error message with key 
name") {
+  val key = "SomeKey"
+  val conf = new SparkConf()
+  conf.set(key, "SomeInvalidValue")
+  val thrown = the [IllegalArgumentException] thrownBy {
+getValue(conf, key)
+  }
+  thrown.getMessage should include (key)
--- End diff --

Shall we stick to `assert` syntax?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21454: [SPARK-24337][Core] Improve error messages for Sp...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21454#discussion_r191625705
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -448,6 +473,20 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Seria
*/
   private[spark] def getenv(name: String): String = System.getenv(name)
 
+  /**
+   * Wrapper method for get*() methods which require some specific value 
format. This catches
--- End diff --

`get*()` .. ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21454: [SPARK-24337][Core] Improve error messages for Sp...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21454#discussion_r191625505
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -265,108 +265,121 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Seria
* Get a time parameter as seconds; throws a NoSuchElementException if 
it's not set. If no
* suffix is provided then seconds are assumed.
* @throws java.util.NoSuchElementException If the time parameter is not 
set
+   * @throws IllegalArgumentException If the value can't be interpreted as 
seconds
--- End diff --

I usually avoid abbreviation in the documentation though. `can't` -> 
`cannot`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...

2018-05-29 Thread Ngone51
Github user Ngone51 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21440#discussion_r191623277
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala 
---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import scala.util.Random
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with 
MockitoSugar
+with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+val conf = new SparkConf()
+val env = mock[SparkEnv]
+SparkEnv.set(env)
+when(env.conf).thenReturn(conf)
+  }
+
+  override protected def afterEach(): Unit = {
+SparkEnv.set(null)
+  }
+
+  private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): 
ChunkedByteBuffer = {
+val bytes = (0 until nChunks).map { chunkIdx =>
+  val bb = ByteBuffer.allocate(perChunk)
+  (0 until perChunk).foreach { idx =>
+bb.put((chunkIdx * perChunk + idx).toByte)
+  }
+  bb.position(0)
+  bb
+}.toArray
+new ChunkedByteBuffer(bytes)
+  }
+
+  test("transferTo can stop and resume correctly") {
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L)
+val cbb = generateChunkByteBuffer(4, 10)
+val fileRegion = cbb.toNetty
+
+val targetChannel = new LimitedWritableByteChannel(40)
+
+var pos = 0L
+// write the fileregion to the channel, but with the transfer limited 
at various spots along
+// the way.
+
+// limit to within the first chunk
+targetChannel.acceptNBytes = 5
+pos = fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 5)
+
+// a little bit further within the first chunk
+targetChannel.acceptNBytes = 2
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 7)
+
+// past the first chunk, into the 2nd
+targetChannel.acceptNBytes = 6
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 13)
+
+// right to the end of the 2nd chunk
+targetChannel.acceptNBytes = 7
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 20)
+
+// rest of 2nd chunk, all of 3rd, some of 4th
+targetChannel.acceptNBytes = 15
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 35)
+
+// now till the end
+targetChannel.acceptNBytes = 5
+pos += fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+
+// calling again at the end should be OK
+targetChannel.acceptNBytes = 20
+fileRegion.transferTo(targetChannel, pos)
+assert(targetChannel.pos === 40)
+  }
+
+  test(s"transfer to with random limits") {
+val rng = new Random()
+val seed = System.currentTimeMillis()
+logInfo(s"seed = $seed")
+rng.setSeed(seed)
+val chunkSize = 1e4.toInt
+SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 
rng.nextInt(chunkSize).toLong)
+
+val cbb = generateChunkByteBuffer(50, chunkSize)
+val fileRegion = cbb.toNetty
+val transferLimit = 1e5.toInt
+val targetChannel = new LimitedWritableByteChannel(transferLimit)
+while (targetChannel.pos < cbb.size) {
+  val nextTransferSize = rng.nextInt(transferLimit)
+  targetChannel.acceptNBytes = nextTransferSize
+  

[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21437#discussion_r191624414
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -88,3 +89,9 @@ def taskAttemptId(self):
 TaskAttemptID.
 """
 return self._taskAttemptId
+
+def getLocalProperty(self, key):
+"""
+Get a local property set upstream in the driver, or None if it is 
missing.
--- End diff --

No, it seems not. Shall we change it to `get(key)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21457: [SPARK-24414][ui] Calculate the correct number of tasks ...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21457
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21457: [SPARK-24414][ui] Calculate the correct number of tasks ...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21457
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91276/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21457: [SPARK-24414][ui] Calculate the correct number of tasks ...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21457
  
**[Test build #91276 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91276/testReport)**
 for PR 21457 at commit 
[`40b6cb7`](https://github.com/apache/spark/commit/40b6cb7117598560d91bf6efb148c482eadd8daf).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r191623502
  
--- Diff: python/pyspark/util.py ---
@@ -55,7 +55,9 @@ def _get_argspec(f):
 """
 # `getargspec` is deprecated since python3.0 (incompatible with 
function annotations).
 # See SPARK-23569.
-if sys.version_info[0] < 3:
+if hasattr(f, '_argspec'):
--- End diff --

Yea, the current way sounds a hack .. let's document this although we have 
a plan to clean this up soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21437#discussion_r191622793
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -88,3 +89,9 @@ def taskAttemptId(self):
 TaskAttemptID.
 """
 return self._taskAttemptId
+
+def getLocalProperty(self, key):
+"""
+Get a local property set upstream in the driver, or None if it is 
missing.
--- End diff --

Wait .. it's a dictionary. Does a dictionary's `get` has `default` keyword 
argument?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...

2018-05-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21437#discussion_r191622279
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -88,3 +89,9 @@ def taskAttemptId(self):
 TaskAttemptID.
 """
 return self._taskAttemptId
+
+def getLocalProperty(self, key):
+"""
+Get a local property set upstream in the driver, or None if it is 
missing.
--- End diff --

yeah. I added the default=None to make it super obvious. i see no harm in 
making the code more intuitive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20901: [SPARK-23792][DOCS] Documentation improvements for datet...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20901
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21437
  
Seems fine and I'm okay with it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-29 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r191621339
  
--- Diff: python/pyspark/util.py ---
@@ -55,7 +55,9 @@ def _get_argspec(f):
 """
 # `getargspec` is deprecated since python3.0 (incompatible with 
function annotations).
 # See SPARK-23569.
-if sys.version_info[0] < 3:
+if hasattr(f, '_argspec'):
--- End diff --

Can you add a comment explaining this? Just from this function it's not 
clear to me why do we need
`_argspec`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-29 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r191621391
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -157,7 +157,17 @@ def _create_judf(self):
 spark = SparkSession.builder.getOrCreate()
 sc = spark.sparkContext
 
-wrapped_func = _wrap_function(sc, self.func, self.returnType)
+func = fail_on_stopiteration(self.func)
+
+# for pandas UDFs the worker needs to know if the function takes
+# one or two arguments, but the signature is lost when wrapping 
with
+# fail_on_stopiteration, so we store it here
+if self.evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+ PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
+func._argspec = _get_argspec(self.func)
--- End diff --

I see. Thanks for the clarification.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21437#discussion_r191621228
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -88,3 +89,9 @@ def taskAttemptId(self):
 TaskAttemptID.
 """
 return self._taskAttemptId
+
+def getLocalProperty(self, key):
+"""
+Get a local property set upstream in the driver, or None if it is 
missing.
--- End diff --

get(key) would return `None` tho if it's missing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21409: [SPARK-24365][SQL] Add Data Source write benchmar...

2018-05-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21409


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21409: [SPARK-24365][SQL] Add Data Source write benchmar...

2018-05-29 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21409#discussion_r191621006
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark to measure data source write performance.
+ * By default it measures 4 data source format: Parquet, ORC, JSON, CSV:
+ *  spark-submit --class  
+ * To measure specified formats, run it with arguments:
+ *  spark-submit --class   format1 
[format2] [...]
+ */
+object DataSourceWriteBenchmark {
+  val conf = new SparkConf()
+.setAppName("DataSourceWriteBenchmark")
+.setIfMissing("spark.master", "local[1]")
+.set("spark.sql.parquet.compression.codec", "snappy")
+.set("spark.sql.orc.compression.codec", "snappy")
+
+  val spark = SparkSession.builder.config(conf).getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+
+  val tempTable = "temp"
+  val numRows = 1024 * 1024 * 15
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally {
+  tableNames.foreach { name =>
+spark.sql(s"DROP TABLE IF EXISTS $name")
+  }
+}
+  }
+
+  def writeInt(table: String, format: String, benchmark: Benchmark): Unit 
= {
+spark.sql(s"create table $table(c1 INT, c2 STRING) using $format")
+benchmark.addCase("Output Single Int Column") { _ =>
+  spark.sql(s"INSERT overwrite table $table select cast(id as INT) as 
" +
+s"c1, cast(id as STRING) as c2 from $tempTable")
+}
+  }
+
+  def writeIntString(table: String, format: String, benchmark: Benchmark): 
Unit = {
+spark.sql(s"create table $table(c1 INT, c2 STRING) using $format")
+benchmark.addCase("Output Int and String Column") { _ =>
+  spark.sql(s"INSERT overwrite table $table select cast(id as INT) as 
" +
+s"c1, cast(id as STRING) as c2 from $tempTable")
+}
+  }
+
+  def writePartition(table: String, format: String, benchmark: Benchmark): 
Unit = {
+spark.sql(s"create table $table(p INT, id INT) using $format 
PARTITIONED BY (p)")
+benchmark.addCase("Output Partitions") { _ =>
+  spark.sql(s"INSERT overwrite table $table select cast(id as INT) as 
id," +
+s" cast(id % 2 as INT) as p from $tempTable")
+}
+  }
+
+  def writeBucket(table: String, format: String, benchmark: Benchmark): 
Unit = {
+spark.sql(s"create table $table(c1 INT, c2 INT) using $format 
CLUSTERED BY (c2) INTO 2 BUCKETS")
+benchmark.addCase("Output Buckets") { _ =>
+  spark.sql(s"INSERT overwrite table $table select cast(id as INT) as 
" +
+s"c1, cast(id as INT) as c2 from $tempTable")
+}
+  }
+
+  def main(args: Array[String]): Unit = {
+val tableInt = "tableInt"
+val tableIntString = "tableIntString"
+val tablePartition = "tablePartition"
+val tableBucket = "tableBucket"
+// If the
+val formats: Seq[String] = if (args.isEmpty) {
+  Seq("Parquet", "ORC", "JSON", "CSV")
+} else {
+  args
+}
+/*
+Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
+Parquet writer benchmark:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Output Single Int Column 

[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21437#discussion_r191620990
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -88,3 +89,9 @@ def taskAttemptId(self):
 TaskAttemptID.
 """
 return self._taskAttemptId
+
+def getLocalProperty(self, key):
+"""
+Get a local property set upstream in the driver, or None if it is 
missing.
--- End diff --

Just get(key) returns `None` if missing tho ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21409: [SPARK-24365][SQL] Add Data Source write benchmark

2018-05-29 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21409
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmar...

2018-05-29 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21288#discussion_r191620766
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
 ---
@@ -131,211 +132,214 @@ object FilterPushdownBenchmark {
 }
 
 /*
+OpenJDK 64-Bit Server VM 1.8.0_171-b10 on Linux 
4.14.26-46.32.amzn1.x86_64
 Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 Select 0 string row (value IS NULL): Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

-Parquet Vectorized8452 / 8504  1.9 
537.3   1.0X
-Parquet Vectorized (Pushdown)  274 /  281 57.3 
 17.4  30.8X
-Native ORC Vectorized 8167 / 8185  1.9 
519.3   1.0X
-Native ORC Vectorized (Pushdown)   365 /  379 43.1 
 23.2  23.1X
+Parquet Vectorized2961 / 3123  5.3 
188.3   1.0X
+Parquet Vectorized (Pushdown) 3057 / 3121  5.1 
194.4   1.0X
--- End diff --

That might be, but I feel the change was too big... I probably think that I 
had some mistakes in the last benchmark runs (I've not found why yet though).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21422: [Spark-24376][doc]Summary:compiling spark with scala-2.1...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21422
  
Thank you @gentlewangyu.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r191619371
  
--- Diff: python/pyspark/util.py ---
@@ -55,7 +55,9 @@ def _get_argspec(f):
 """
 # `getargspec` is deprecated since python3.0 (incompatible with 
function annotations).
 # See SPARK-23569.
-if sys.version_info[0] < 3:
+if hasattr(f, '_argspec'):
+argspec = f._argspec
--- End diff --

@e-dorigatti, sorry last comment. Shall we add a short note that we will 
reach here when Pandas UDFs only for now? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21246
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3690/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21246
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21383#discussion_r191619171
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -157,7 +157,17 @@ def _create_judf(self):
 spark = SparkSession.builder.getOrCreate()
 sc = spark.sparkContext
 
-wrapped_func = _wrap_function(sc, self.func, self.returnType)
+func = fail_on_stopiteration(self.func)
+
+# for pandas UDFs the worker needs to know if the function takes
+# one or two arguments, but the signature is lost when wrapping 
with
+# fail_on_stopiteration, so we store it here
+if self.evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+ PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
+func._argspec = _get_argspec(self.func)
--- End diff --

Yup, we definitely support. The current approach probably wouldn't change 
anything we supported before. I believe the builtin functions in Python 2 don't 
already with with Pandas UDFs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21246
  
**[Test build #91283 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91283/testReport)**
 for PR 21246 at commit 
[`6fd8f2f`](https://github.com/apache/spark/commit/6fd8f2fbd37e5193f0ffb1a25a8f4a8c71ab55bd).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191618335
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * A [[ContinuousShuffleWriter]] sending data to 
[[RPCContinuousShuffleReader]] instances.
+ *
+ * @param writerId  The partition ID of this writer.
+ * @param outputPartitioner The partitioner on the reader side of the 
shuffle.
+ * @param endpoints The [[RPCContinuousShuffleReader]] endpoints 
to write to. Indexed by
+ *  partition ID within outputPartitioner.
+ */
+class RPCContinuousShuffleWriter(
+writerId: Int,
+outputPartitioner: Partitioner,
+endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter {
+
+  if (outputPartitioner.numPartitions != 1) {
--- End diff --

I believe so, but there's no way to test whether it will work until we 
implement the scheduling support for distributing the addresses of each of the 
multiple readers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/21246
  
LGTM pending Jenkins.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/21246
  
I'd retrigger the build for just checking again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191618218
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * A [[ContinuousShuffleWriter]] sending data to 
[[RPCContinuousShuffleReader]] instances.
+ *
+ * @param writerId  The partition ID of this writer.
+ * @param outputPartitioner The partitioner on the reader side of the 
shuffle.
+ * @param endpoints The [[RPCContinuousShuffleReader]] endpoints 
to write to. Indexed by
+ *  partition ID within outputPartitioner.
+ */
+class RPCContinuousShuffleWriter(
+writerId: Int,
+outputPartitioner: Partitioner,
+endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter {
+
+  if (outputPartitioner.numPartitions != 1) {
+throw new IllegalArgumentException("multiple readers not yet 
supported")
+  }
+
+  if (outputPartitioner.numPartitions != endpoints.length) {
+throw new IllegalArgumentException(s"partitioner size 
${outputPartitioner.numPartitions} did " +
+  s"not match endpoint count ${endpoints.length}")
+  }
+
+  def write(epoch: Iterator[UnsafeRow]): Unit = {
+while (epoch.hasNext) {
+  val row = epoch.next()
+  
endpoints(outputPartitioner.getPartition(row)).ask[Unit](ReceiverRow(writerId, 
row))
--- End diff --

cc @zsxwing 

It's my understanding that the RPC framework guarantees messages will be 
sent in the order that they're ask()ed, and that it's therefore not possible 
for a single row to fail to be sent while the ones before and after it succeed. 
If this is the case, then we don't need to handle it here - the query will just 
start failing to make progress.

If it's not the case, we'll need a more clever solution. Maybe have the 
epoch marker message contain a count for the number of rows that are supposed 
to be in the epoch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191618212
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * A [[ContinuousShuffleWriter]] sending data to 
[[RPCContinuousShuffleReader]] instances.
+ *
+ * @param writerId  The partition ID of this writer.
+ * @param outputPartitioner The partitioner on the reader side of the 
shuffle.
+ * @param endpoints The [[RPCContinuousShuffleReader]] endpoints 
to write to. Indexed by
+ *  partition ID within outputPartitioner.
+ */
+class RPCContinuousShuffleWriter(
+writerId: Int,
--- End diff --

ok makes sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions

2018-05-29 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/21246
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21422: [Spark-24376][doc]Summary:compiling spark with sc...

2018-05-29 Thread gentlewangyu
Github user gentlewangyu closed the pull request at:

https://github.com/apache/spark/pull/21422


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21422: [Spark-24376][doc]Summary:compiling spark with scala-2.1...

2018-05-29 Thread gentlewangyu
Github user gentlewangyu commented on the issue:

https://github.com/apache/spark/pull/21422
  
@HyukjinKwon @jerryshao  ok , thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191617881
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * A [[ContinuousShuffleWriter]] sending data to 
[[RPCContinuousShuffleReader]] instances.
+ *
+ * @param writerId  The partition ID of this writer.
+ * @param outputPartitioner The partitioner on the reader side of the 
shuffle.
+ * @param endpoints The [[RPCContinuousShuffleReader]] endpoints 
to write to. Indexed by
+ *  partition ID within outputPartitioner.
+ */
+class RPCContinuousShuffleWriter(
+writerId: Int,
--- End diff --

I worry that partitionId is ambiguous with the partition to which the 
shuffle data is being written.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191613866
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * A [[ContinuousShuffleWriter]] sending data to 
[[RPCContinuousShuffleReader]] instances.
+ *
+ * @param writerId  The partition ID of this writer.
+ * @param outputPartitioner The partitioner on the reader side of the 
shuffle.
+ * @param endpoints The [[RPCContinuousShuffleReader]] endpoints 
to write to. Indexed by
+ *  partition ID within outputPartitioner.
+ */
+class RPCContinuousShuffleWriter(
+writerId: Int,
--- End diff --

nit: rename to `partitionId`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191617448
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.Partitioner
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * A [[ContinuousShuffleWriter]] sending data to 
[[RPCContinuousShuffleReader]] instances.
+ *
+ * @param writerId  The partition ID of this writer.
+ * @param outputPartitioner The partitioner on the reader side of the 
shuffle.
+ * @param endpoints The [[RPCContinuousShuffleReader]] endpoints 
to write to. Indexed by
+ *  partition ID within outputPartitioner.
+ */
+class RPCContinuousShuffleWriter(
+writerId: Int,
+outputPartitioner: Partitioner,
+endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter {
+
+  if (outputPartitioner.numPartitions != 1) {
--- End diff --

any reason to disable it ? this should work rt?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF should assi...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21427
  
For configuration, I wasn't sure if we should send the whole configuration 
map into worker.py side, if we should fix the command writing way, and also was 
thinking of the current timezone way is kind of one time thing. Was just 
wondering if we really should do that. I am okay if that's the only way and I 
should add a configuration for 2.4. Just for clarification, we should probably 
remove this configuration in 3.0.0 too.

For the current approach, I thought we better check if there are other 
cases possibly broken and see if that makes sense rather then just blocking 
this only because there are a bit of behaviour changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191615398
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -288,4 +267,153 @@ class ContinuousShuffleReadSuite extends StreamTest {
 val thirdEpoch = rdd.compute(rdd.partitions(0), 
ctx).map(_.getUTF8String(0).toString).toSet
 assert(thirdEpoch == Set("writer1-row1", "writer2-row0"))
   }
+
+  test("one epoch") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator(1, 2, 3))
+
+assert(readEpoch(reader) == Seq(1, 2, 3))
+  }
+
+  test("multiple epochs") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator(1, 2, 3))
+writer.write(Iterator(4, 5, 6))
+
+assert(readEpoch(reader) == Seq(1, 2, 3))
+assert(readEpoch(reader) == Seq(4, 5, 6))
+  }
+
+  test("empty epochs") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator())
+writer.write(Iterator(1, 2))
+writer.write(Iterator())
+writer.write(Iterator())
+writer.write(Iterator(3, 4))
+writer.write(Iterator())
+
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq(1, 2))
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq(3, 4))
+assert(readEpoch(reader) == Seq())
+  }
+
+  test("blocks waiting for writer") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+val readerEpoch = reader.compute(reader.partitions(0), ctx)
+
+val readRowThread = new Thread {
+  override def run(): Unit = {
+assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1))
+  }
+}
+readRowThread.start()
+
+eventually(timeout(streamingTimeout)) {
+  assert(readRowThread.getState == Thread.State.TIMED_WAITING)
+}
+
+// Once we write the epoch the thread should stop waiting and succeed.
+writer.write(Iterator(1))
+readRowThread.join()
+  }
+
+  test("multiple writer partitions") {
+val numWriterPartitions = 3
+
+val reader = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, numShuffleWriters = 
numWriterPartitions)
+val writers = (0 until 3).map { idx =>
+  new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), 
Array(readRDDEndpoint(reader)))
+}
+
+writers(0).write(Iterator(1, 4, 7))
+writers(1).write(Iterator(2, 5))
+writers(2).write(Iterator(3, 6))
+
+writers(0).write(Iterator(4, 7, 10))
+writers(1).write(Iterator(5, 8))
+writers(2).write(Iterator(6, 9))
+
+// Since there are multiple asynchronous writers, the original row 
sequencing is not guaranteed.
+// The epochs should be deterministically preserved, however.
+assert(readEpoch(reader).toSet == Seq(1, 2, 3, 4, 5, 6, 7).toSet)
+assert(readEpoch(reader).toSet == Seq(4, 5, 6, 7, 8, 9, 10).toSet)
+  }
+
+  test("reader epoch only ends when all writer partitions write it") {
+val numWriterPartitions = 3
+
+val reader = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, numShuffleWriters = 
numWriterPartitions)
+val writers = (0 until 3).map { idx =>
+  new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), 
Array(readRDDEndpoint(reader)))
+}
+
+writers(1).write(Iterator())
+writers(2).write(Iterator())
+
+val readerEpoch = reader.compute(reader.partitions(0), ctx)
+
+val readEpochMarkerThread = new Thread {
+  override def run(): Unit = {
+assert(!readerEpoch.hasNext)
+  }
+}
+
+readEpochMarkerThread.start()
+eventually(timeout(streamingTimeout)) {
+  assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING)
+}
+
+writers(0).write(Iterator())
+readEpochMarkerThread.join()
+  }
+
+  test("receiver stopped with row last") {

[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21413
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91282/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21413
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21413
  
**[Test build #91282 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91282/testReport)**
 for PR 21413 at commit 
[`d8f3906`](https://github.com/apache/spark/commit/d8f3906be4d4178d3c41bff41eaeb39f430ade6b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-05-29 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r191611678
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -127,6 +127,165 @@ case class MapKeys(child: Expression)
   override def prettyName: String = "map_keys"
 }
 
+@ExpressionDescription(
+  usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in 
the N-th position the
+  N-th value of each array given.""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
+[[1, 2], [2, 3], [3, 4]]
+  > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
+[[1, 2, 3], [2, 3, 4]]
+  """,
+  since = "2.4.0")
+case class Zip(children: Seq[Expression]) extends Expression with 
ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.length)(ArrayType)
+
+  override def dataType: DataType = ArrayType(mountSchema)
+
+  override def nullable: Boolean = children.forall(_.nullable)
+
+  private lazy val arrayTypes = 
children.map(_.dataType.asInstanceOf[ArrayType])
+
+  private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
+
+  def mountSchema: StructType = {
+val fields = children.zip(arrayElementTypes).zipWithIndex.map {
+  case ((expr: NamedExpression, elementType), _) =>
+StructField(expr.name, elementType, nullable = true)
+  case ((_, elementType), idx) =>
+StructField(s"$idx", elementType, nullable = true)
+}
+StructType(fields)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val numberOfArrays: Int = children.length
+val genericArrayData = classOf[GenericArrayData].getName
+val genericInternalRow = classOf[GenericInternalRow].getName
+val arrVals = ctx.freshName("arrVals")
+val arrCardinality = ctx.freshName("arrCardinality")
+val biggestCardinality = ctx.freshName("biggestCardinality")
+val storedArrTypes = ctx.freshName("storedArrTypes")
+val returnNull = ctx.freshName("returnNull")
+val evals = children.map(_.genCode(ctx))
+
+val inputs = evals.zipWithIndex.map { case (eval, index) =>
+  s"""
+|${eval.code}
+|if (!${eval.isNull}) {
+|  $arrVals[$index] = ${eval.value};
+|  $arrCardinality[$index] = ${eval.value}.numElements();
+|} else {
+|  $arrVals[$index] = null;
+|  $arrCardinality[$index] = 0;
+|  $returnNull[0] = true;
+|}
+|$storedArrTypes[$index] = "${arrayElementTypes(index)}";
+|$biggestCardinality = Math.max($biggestCardinality, 
$arrCardinality[$index]);
+  """.stripMargin
+}
+
+val inputsSplitted = ctx.splitExpressions(
+  expressions = inputs,
+  funcName = "getInputAndCardinality",
+  returnType = "int",
+  makeSplitFunction = body =>
+s"""
+  |$body
+  |return $biggestCardinality;
+""".stripMargin,
+  foldFunctions = _.map(funcCall => s"$biggestCardinality = 
$funcCall;").mkString("\n"),
+  arguments =
+("ArrayData[]", arrVals) ::
+("int[]", arrCardinality) ::
+("String[]", storedArrTypes) ::
+("int", biggestCardinality) ::
+("boolean[]", returnNull) :: Nil)
+
+val myobject = ctx.freshName("myobject")
+val j = ctx.freshName("j")
+val i = ctx.freshName("i")
+val args = ctx.freshName("args")
+
+val cases = arrayElementTypes.distinct.map { elementType =>
+  val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", 
elementType, i)
+  s"""
+|case "${elementType}":
+|  $myobject[$j] = $getArrValsItem;
+|  break;
+  """.stripMargin
+}
+
+ev.copy(s"""
+  |ArrayData[] $arrVals = new ArrayData[$numberOfArrays];
+  |int[] $arrCardinality = new int[$numberOfArrays];
+  |int $biggestCardinality = 0;
+  |String[] $storedArrTypes = new String[$numberOfArrays];
+  |boolean[] $returnNull = new boolean[1];
+  |$returnNull[0] = false;
+  |$inputsSplitted
+  |${CodeGenerator.javaType(dataType)} ${ev.value};
+  |boolean ${ev.isNull} = $returnNull[0];
+  |if (${ev.isNull}) {
+  |  ${ev.value} = null;
+  |} else {
+  |  if ($numberOfArrays == 0) {
+  |${ev.value} = new $genericArrayData(new Object[0]);
--- End diff --

We can simplify the generated code when `numberOfArrays == 0` instead of 
generating the whole code.


---


[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-05-29 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r191610871
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -127,6 +127,165 @@ case class MapKeys(child: Expression)
   override def prettyName: String = "map_keys"
 }
 
+@ExpressionDescription(
+  usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in 
the N-th position the
+  N-th value of each array given.""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
+[[1, 2], [2, 3], [3, 4]]
+  > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
+[[1, 2, 3], [2, 3, 4]]
+  """,
+  since = "2.4.0")
+case class Zip(children: Seq[Expression]) extends Expression with 
ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.length)(ArrayType)
+
+  override def dataType: DataType = ArrayType(mountSchema)
+
+  override def nullable: Boolean = children.forall(_.nullable)
+
+  private lazy val arrayTypes = 
children.map(_.dataType.asInstanceOf[ArrayType])
+
+  private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
+
+  def mountSchema: StructType = {
+val fields = children.zip(arrayElementTypes).zipWithIndex.map {
+  case ((expr: NamedExpression, elementType), _) =>
+StructField(expr.name, elementType, nullable = true)
+  case ((_, elementType), idx) =>
+StructField(s"$idx", elementType, nullable = true)
+}
+StructType(fields)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val numberOfArrays: Int = children.length
+val genericArrayData = classOf[GenericArrayData].getName
+val genericInternalRow = classOf[GenericInternalRow].getName
+val arrVals = ctx.freshName("arrVals")
+val arrCardinality = ctx.freshName("arrCardinality")
+val biggestCardinality = ctx.freshName("biggestCardinality")
+val storedArrTypes = ctx.freshName("storedArrTypes")
+val returnNull = ctx.freshName("returnNull")
+val evals = children.map(_.genCode(ctx))
+
+val inputs = evals.zipWithIndex.map { case (eval, index) =>
+  s"""
+|${eval.code}
+|if (!${eval.isNull}) {
+|  $arrVals[$index] = ${eval.value};
+|  $arrCardinality[$index] = ${eval.value}.numElements();
+|} else {
+|  $arrVals[$index] = null;
+|  $arrCardinality[$index] = 0;
+|  $returnNull[0] = true;
+|}
+|$storedArrTypes[$index] = "${arrayElementTypes(index)}";
+|$biggestCardinality = Math.max($biggestCardinality, 
$arrCardinality[$index]);
+  """.stripMargin
+}
+
+val inputsSplitted = ctx.splitExpressions(
+  expressions = inputs,
+  funcName = "getInputAndCardinality",
+  returnType = "int",
+  makeSplitFunction = body =>
+s"""
+  |$body
+  |return $biggestCardinality;
+""".stripMargin,
+  foldFunctions = _.map(funcCall => s"$biggestCardinality = 
$funcCall;").mkString("\n"),
+  arguments =
+("ArrayData[]", arrVals) ::
+("int[]", arrCardinality) ::
+("String[]", storedArrTypes) ::
+("int", biggestCardinality) ::
+("boolean[]", returnNull) :: Nil)
+
+val myobject = ctx.freshName("myobject")
+val j = ctx.freshName("j")
+val i = ctx.freshName("i")
+val args = ctx.freshName("args")
+
+val cases = arrayElementTypes.distinct.map { elementType =>
+  val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", 
elementType, i)
+  s"""
+|case "${elementType}":
+|  $myobject[$j] = $getArrValsItem;
+|  break;
+  """.stripMargin
+}
+
+ev.copy(s"""
+  |ArrayData[] $arrVals = new ArrayData[$numberOfArrays];
+  |int[] $arrCardinality = new int[$numberOfArrays];
+  |int $biggestCardinality = 0;
+  |String[] $storedArrTypes = new String[$numberOfArrays];
+  |boolean[] $returnNull = new boolean[1];
+  |$returnNull[0] = false;
+  |$inputsSplitted
+  |${CodeGenerator.javaType(dataType)} ${ev.value};
+  |boolean ${ev.isNull} = $returnNull[0];
+  |if (${ev.isNull}) {
+  |  ${ev.value} = null;
+  |} else {
+  |  if ($numberOfArrays == 0) {
+  |${ev.value} = new $genericArrayData(new Object[0]);
+  |  } else {
+  |Object[] $args = new Object[$biggestCardinality];
+  |for (int $i = 0; $i < $biggestCardinality; 

[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21413
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21413
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3689/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF should assi...

2018-05-29 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/21427
  
I guess sending configurations is not that difficult.
We can write configs (as `Map[String, String]` for further configurations 
in the future?) before `PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)` 
in `ArrowPythonRunner.writeCommand()` (and `PythonUDFRunner.writeCommand()`?), 
and read them before read udfs at `worker.py`. The `timezone` can be included 
in the configs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21366
  
Kubernetes integration test status success
URL: 
https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3555/



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF should assi...

2018-05-29 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21427
  
Yup, my impression was that there could be a corner case too but I wasn't 
sure how much the corner case makes sense, and haven't checked it closelt yet. 
I believe elaborating the case might be helpful to judge we should block this 
or now. The current approach looks fine in general to me though. I think it's 
fine if it's a bit of behaviour change as long as we mention it in the 
migration guide cc @cloud-fan too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21366
  
Kubernetes integration test starting
URL: 
https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3555/



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21366
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3688/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21437
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...

2018-05-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21437
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91274/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21437
  
**[Test build #91274 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91274/testReport)**
 for PR 21437 at commit 
[`9d95c12`](https://github.com/apache/spark/commit/9d95c12a0ada0520f426723406a7d99aada2760d).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21413
  
**[Test build #91282 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91282/testReport)**
 for PR 21413 at commit 
[`d8f3906`](https://github.com/apache/spark/commit/d8f3906be4d4178d3c41bff41eaeb39f430ade6b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Pyt...

2018-05-29 Thread huaxingao
Github user huaxingao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21413#discussion_r191611779
  
--- Diff: python/pyspark/ml/regression.py ---
@@ -619,6 +627,22 @@ def getSubsamplingRate(self):
 """
 return self.getOrDefault(self.subsamplingRate)
 
+@since("1.4.0")
+def setFeatureSubsetStrategy(self, value):
+"""
+Sets the value of :py:attr:`featureSubsetStrategy`.
+
+.. note:: Deprecated in 2.1.0 and will be removed in 2.4.0.
--- End diff --

Sorry.  Fixed. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...

2018-05-29 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21366
  
**[Test build #91281 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91281/testReport)**
 for PR 21366 at commit 
[`5b9c00f`](https://github.com/apache/spark/commit/5b9c00fa39d1c83435ca65de5394345e5d6f1f00).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >