[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19321
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19321
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82101/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19321
  
**[Test build #82101 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82101/testReport)**
 for PR 19321 at commit 
[`45e655f`](https://github.com/apache/spark/commit/45e655f14f9beed6bc6bc584b73619ba46a51c43).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19329: [SPARK-22110][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19329
  
**[Test build #82105 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82105/testReport)**
 for PR 19329 at commit 
[`0f3307d`](https://github.com/apache/spark/commit/0f3307dfc3dd21b3d643d3d58e9202743f958b23).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #9207: [SPARK-11171][SPARK-11237][SPARK-11241][ML] Try adding PM...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/9207
  
Build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19329: [SPARK-22110][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19329
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #9207: [SPARK-11171][SPARK-11237][SPARK-11241][ML] Try adding PM...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/9207
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82102/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #9207: [SPARK-11171][SPARK-11237][SPARK-11241][ML] Try adding PM...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/9207
  
**[Test build #82102 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82102/consoleFull)**
 for PR 9207 at commit 
[`9cb8994`](https://github.com/apache/spark/commit/9cb899443473eceaad459c5fb550b7a8f3780a9c).
 * This patch **fails Spark unit tests**.
 * This patch **does not merge cleanly**.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140626955
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
   outputIterator.map(new ArrowPayload(_)), context)
 
 // Verify that the output schema is correct
-val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
-  .map { case (attr, i) => attr.withName(s"_$i") })
-assert(schemaOut.equals(outputRowIterator.schema),
-  s"Invalid schema from pandas_udf: expected $schemaOut, got 
${outputRowIterator.schema}")
+if (outputIterator.nonEmpty) {
+  val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
+.map { case (attr, i) => attr.withName(s"_$i") })
+  assert(schemaOut.equals(outputRowIterator.schema),
+s"Invalid schema from pandas_udf: expected $schemaOut, got 
${outputRowIterator.schema}")
--- End diff --

Looks like we don't have a test against this case. We should add a test for 
invalid schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140626292
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
   outputIterator.map(new ArrowPayload(_)), context)
 
 // Verify that the output schema is correct
-val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
-  .map { case (attr, i) => attr.withName(s"_$i") })
-assert(schemaOut.equals(outputRowIterator.schema),
-  s"Invalid schema from pandas_udf: expected $schemaOut, got 
${outputRowIterator.schema}")
+if (outputIterator.nonEmpty) {
--- End diff --

I guess we should use `outputRowIterator`.
Btw how about using `hasNext` instead of `nonEmpty`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...

2017-09-22 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19325
  
Most looks pretty good. Only main question I have is about the empty 
partition issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140626184
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3344,6 +3342,22 @@ def test_vectorized_udf_wrong_return_type(self):
 'Invalid.*type.*string'):
 df.select(f(col('x'))).collect()
 
+def test_vectorized_udf_decorator(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.range(10)
+
+@pandas_udf(returnType=LongType())
+def identity(x):
+return x
+res = df.select(identity(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_empty_partition(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
--- End diff --

Oh. I see. One partition is empty and it is related to the added stuff in 
`ArrowEvalPythonExec`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140626154
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
   outputIterator.map(new ArrowPayload(_)), context)
 
 // Verify that the output schema is correct
-val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
-  .map { case (attr, i) => attr.withName(s"_$i") })
-assert(schemaOut.equals(outputRowIterator.schema),
-  s"Invalid schema from pandas_udf: expected $schemaOut, got 
${outputRowIterator.schema}")
+if (outputIterator.nonEmpty) {
--- End diff --

After `outputIterator` is consumed by `ArrowConverters`, can `nonEmpty` 
return a meaningful value?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140626051
  
--- Diff: python/pyspark/worker.py ---
@@ -80,14 +77,12 @@ def wrap_pandas_udf(f, return_type):
 arrow_return_type = toArrowType(return_type)
 
 def verify_result_length(*a):
-kwargs = a[-1]
-result = f(*a[:-1], **kwargs)
-if len(result) != kwargs["length"]:
+result = f(*a)
+if len(result) != len(a[0]):
--- End diff --

Should we verify the returned is a Pandas.Series?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140625992
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3344,6 +3342,22 @@ def test_vectorized_udf_wrong_return_type(self):
 'Invalid.*type.*string'):
 df.select(f(col('x'))).collect()
 
+def test_vectorized_udf_decorator(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.range(10)
+
+@pandas_udf(returnType=LongType())
+def identity(x):
+return x
+res = df.select(identity(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+def test_vectorized_udf_empty_partition(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
--- End diff --

Maybe I miss something, but what this test is intended to test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19329: [SPARK-22110][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19329
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19329: [SPARK-22110][SQL][Documentation] Add usage and i...

2017-09-22 Thread kevinyu98
GitHub user kevinyu98 opened a pull request:

https://github.com/apache/spark/pull/19329

[SPARK-22110][SQL][Documentation] Add usage and improve documentation with 
arguments and examples for trim function

## What changes were proposed in this pull request?

This PR proposes to enhance the documentation for `trim` functions in the 
function description session.

- Add more `usage`, `arguments` and `examples` for the trim function
- Adjust space in the `usage` session

After the changes, the trim function documentation will look like this:

 

- `trim`

```trim(str) - Removes the leading and trailing space characters from str.

trim(BOTH trimStr FROM str) - Remove the leading and trailing trimStr 
characters from str

trim(LEADING trimStr FROM str) - Remove the leading trimStr characters from 
str

trim(TRAILING trimStr FROM str) - Remove the trailing trimStr characters 
from str

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single 
space
BOTH, FROM - these are keywords to specify trimming string characters from 
both ends of the string
LEADING, FROM - these are keywords to specify trimming string characters 
from the left end of the string
TRAILING, FROM - these are keywords to specify trimming string characters 
from the right end of the string
Examples:

> SELECT trim('SparkSQL   ');
 SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
 parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
 parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
 parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
 SSparkSQ
```

- `ltrim`

```ltrim

ltrim(str) - Removes the leading space characters from str.

ltrim(trimStr, str) - Removes the leading string contains the characters 
from the trim string

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single 
space
Examples:

> SELECT ltrim('SparkSQL   ');
 SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
 arkSQLS
```

- `rtrim`
```rtrim

rtrim(str) - Removes the trailing space characters from str.

rtrim(trimStr, str) - Removes the trailing string which contains the 
characters from the trim string from the str

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single 
space
Examples:

> SELECT rtrim('SparkSQL   ');
 SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
 SSpark
```

This is the trim characters function jira: [trim 
function](https://issues.apache.org/jira/browse/SPARK-14878)

## How was this patch tested?

Manually tested
```
spark-sql> describe function extended trim;
17/09/22 17:03:04 INFO CodeGenerator: Code generated in 153.026533 ms
Function: trim
Class: org.apache.spark.sql.catalyst.expressions.StringTrim
Usage: 
trim(str) - Removes the leading and trailing space characters from 
`str`.

trim(BOTH trimStr FROM str) - Remove the leading and trailing `trimStr` 
characters from `str`

trim(LEADING trimStr FROM str) - Remove the leading `trimStr` 
characters from `str`

trim(TRAILING trimStr FROM str) - Remove the trailing `trimStr` 
characters from `str`
  
Extended Usage:
Arguments:
  * str - a string expression
  * trimStr - the trim string characters to trim, the default value is 
a single space
  * BOTH, FROM - these are keywords to specify trimming string 
characters from both ends of
  the string
  * LEADING, FROM - these are keywords to specify trimming string 
characters from the left
  end of the string
  * TRAILING, FROM - these are keywords to specify trimming string 
characters from the right
  end of the string
  
Examples:
  > SELECT trim('SparkSQL   ');
   SparkSQL
  > SELECT trim('SL', 'SSparkSQLS');
   parkSQ
  > SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
   parkSQ
  > SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
   parkSQLS
  > SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
   SSparkSQ
  ```
```
spark-sql> describe function extended ltrim;
Function: ltrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimLeft
Usage: 
ltrim(str) - Removes the leading space characters from `str`.

ltrim(trimStr, str) - Removes the leading string contains the 
characters from the trim string
  
Extended Usage:
Arguments:
  * str - a string expression
  * trimStr - the trim string characters to trim, the 

[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/13599
  
**[Test build #82104 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82104/testReport)**
 for PR 13599 at commit 
[`abdf7b7`](https://github.com/apache/spark/commit/abdf7b7a8a75dfc7b8de597611bbfa0af126e24e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19328
  
Why you close it? You can just edit the PR title.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19328: [SPARK-22088][SQL][Documentation] Add usage and i...

2017-09-22 Thread kevinyu98
Github user kevinyu98 closed the pull request at:

https://github.com/apache/spark/pull/19328


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread kevinyu98
Github user kevinyu98 commented on the issue:

https://github.com/apache/spark/pull/19328
  
I am so sorry that I made mistake on the jira number, I create a new jira 
SPARK-22110, but I used the wrong number, let me close this PR, then put 
correct jira number.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...

2017-09-22 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19317
  
Oh I get your point. This is different from `RDD.aggregate`, it directly 
return Map and avoid shuffling. it seems useful when numKeys is small.
But, I check the final `reduce` step, it seems can be optimized using 
`treeAggregate`, and we can add a `depth` parameter.
And using `OpenHashSet` instead of `JHashMap` looks better, but we need 
test first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19222
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19222
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82100/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19222
  
**[Test build #82100 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82100/testReport)**
 for PR 19222 at commit 
[`66bfbfc`](https://github.com/apache/spark/commit/66bfbfcacbcd37b911c6de2289352aebd63d52d6).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13143: [SPARK-15359] [Mesos] Mesos dispatcher should handle DRI...

2017-09-22 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/13143
  
Hello @devaraj-kavali. Yes. I've been playing around with this because it's 
inconvenient to clean up ZK whenever you uninstall/reinstall the Dispatcher. 
The problem is that the only signal of a re-install vs. a failover is when 
Mesos gives you a `DRIVER_ABORTED` error (signals a re-install). I think a 
solution will involve having different methods to starting `SchedulerDrivers` 
for Spark Drivers (applications) and the Dispatcher because they inherently 
have different requirements with respect to state. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19320: [SPARK-22099] The 'job ids' list style needs to be chang...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19320
  
**[Test build #82103 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82103/testReport)**
 for PR 19320 at commit 
[`5cb6ea4`](https://github.com/apache/spark/commit/5cb6ea405755fd70ca6f2f7078914cc9dece8c73).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19320: [SPARK-22099] The 'job ids' list style needs to be chang...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19320
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-22 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r140625136
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying again in an 
hour", e)
+  credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
+  return
+  }
+  scheduleRenewal(this)
+}
+  }
+scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
+logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
+// Get new delegation tokens by logging in with a new UGI
+// inspired by AMCredentialRenewer.scala:L174
+val ugi = if (mode == "keytab") {
--- End diff --

Hello @kalvinnchau You are correct, all this does is keep track of when the 
tokens will expire and renew them at that time. Part of my motivation for doing 
this is to avoid writing any files to disk (like new 

[GitHub] spark issue #9207: [SPARK-11171][SPARK-11237][SPARK-11241][ML] Try adding PM...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/9207
  
**[Test build #82102 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82102/consoleFull)**
 for PR 9207 at commit 
[`9cb8994`](https://github.com/apache/spark/commit/9cb899443473eceaad459c5fb550b7a8f3780a9c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19326: [SPARK-22107] Change as to alias in python quickstart

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19326
  
How does it relate to https://github.com/apache/spark/pull/19283?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19303: [SPARK-22085][CORE]When the application has no co...

2017-09-22 Thread 10110346
Github user 10110346 closed the pull request at:

https://github.com/apache/spark/pull/19303


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19328
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82098/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19328
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19328
  
**[Test build #82098 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82098/testReport)**
 for PR 19328 at commit 
[`0f3307d`](https://github.com/apache/spark/commit/0f3307dfc3dd21b3d643d3d58e9202743f958b23).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19321
  
**[Test build #82101 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82101/testReport)**
 for PR 19321 at commit 
[`45e655f`](https://github.com/apache/spark/commit/45e655f14f9beed6bc6bc584b73619ba46a51c43).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19295: [SPARK-22080][SQL] Adds support for allowing user to add...

2017-09-22 Thread wzhfy
Github user wzhfy commented on the issue:

https://github.com/apache/spark/pull/19295
  
ping @cloud-fan @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19321
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82099/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19321
  
**[Test build #82099 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82099/testReport)**
 for PR 19321 at commit 
[`db2c110`](https://github.com/apache/spark/commit/db2c11056cf973708b10b5e5fdcd8ec705e27d21).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19321
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19295: [SPARK-22080][SQL] Adds support for allowing user to add...

2017-09-22 Thread wzhfy
Github user wzhfy commented on the issue:

https://github.com/apache/spark/pull/19295
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19325
  
To me, I am willing to merge this one soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140622826
  
--- Diff: python/pyspark/serializers.py ---
@@ -246,15 +243,9 @@ def cast_series(s, t):
 def loads(self, obj):
 """
 Deserialize an ArrowRecordBatch to an Arrow table and return as a 
list of pandas.Series
--- End diff --

Ugh, this bugged me. `.` at the end ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140623893
  
--- Diff: python/pyspark/worker.py ---
@@ -80,14 +77,12 @@ def wrap_pandas_udf(f, return_type):
 arrow_return_type = toArrowType(return_type)
 
 def verify_result_length(*a):
-kwargs = a[-1]
-result = f(*a[:-1], **kwargs)
-if len(result) != kwargs["length"]:
+result = f(*a)
+if len(result) != len(a[0]):
--- End diff --

I guess we are not guaranteed to have `__len__` in `result`, e.g., 
`pandas_udf(lambda x: 1, LongType())`. Probably, checking this attribute ahead 
should be done ahead, while we are here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140623560
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()):
 :param f: python function if used as a standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
-# TODO: doctest
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+>>> @pandas_udf(returnType=StringType())
+... def to_upper(s):
+... return s.str.upper()
+...
+>>> @pandas_udf(returnType="integer")
+... def add_one(x):
+... return x + 1
+...
+>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
+>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")).show()
++--+--++
+|slen(name)|to_upper(name)|add_one(age)|
++--+--++
+| 8|  JOHN DOE|  22|
++--+--++
 """
+wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True)
 import inspect
-# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
-if inspect.getargspec(f).keywords is None:
-return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
-else:
-return _create_udf(f, returnType=returnType, vectorized=True)
+if not inspect.getargspec(wrapped_udf.func).args:
--- End diff --

This is totally a personal preference based on my little experience. I 
usually avoid to use `if not something` expression .. because it confuses of 
the expected type, for example, this can be `None`, `0` or 0-length of list or 
tuples because it coerces this to a bool. To me, I usually do `is not None` or 
`len(..) > 0`.

I am fine as is too (because I think it's a personal preference) but just 
wanted to leave a side note (and change it if this could persuade you too).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140624294
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()):
 :param f: python function if used as a standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
-# TODO: doctest
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+>>> @pandas_udf(returnType=StringType())
+... def to_upper(s):
+... return s.str.upper()
+...
+>>> @pandas_udf(returnType="integer")
+... def add_one(x):
+... return x + 1
+...
+>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
+>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")).show()
++--+--++
+|slen(name)|to_upper(name)|add_one(age)|
++--+--++
+| 8|  JOHN DOE|  22|
++--+--++
 """
+wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True)
 import inspect
-# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
-if inspect.getargspec(f).keywords is None:
-return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
-else:
-return _create_udf(f, returnType=returnType, vectorized=True)
+if not inspect.getargspec(wrapped_udf.func).args:
--- End diff --

It looks `wrapped_udf.func` could be `_udf` within `_create_udf`, that 
takes a single argument, for example:

```python
@pandas_udf(returnType=LongType())
def add_one():
return 1
```

I tried a rough idea to solve this:

```diff
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2124,11 +2124,14 @@ class UserDefinedFunction(object):
 return wrapper


-def _create_udf(f, returnType, vectorized):
+def _create_udf(f, returnType, vectorized, checker=None):

 def _udf(f, returnType=StringType(), vectorized=vectorized):
 udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized)
-return udf_obj._wrapped()
+wrapped = udf_obj._wrapped()
+if checker is not None:
+checker(wrapped)
+return wrapped

 # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
 if f is None or isinstance(f, (str, DataType)):
@@ -2201,10 +2204,14 @@ def pandas_udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True)
-import inspect
-if not inspect.getargspec(wrapped_udf.func).args:
-raise NotImplementedError("0-parameter pandas_udfs are not 
currently supported")
+
+def checker(wrapped):
+import inspect
+if not inspect.getargspec(wrapped.func).args:
+raise NotImplementedError("0-parameter pandas_udfs are not 
currently supported")
+
+wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True, 
checker=checker)
+
 return wrapped_udf
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140623030
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3256,11 +3256,9 @@ def test_vectorized_udf_null_string(self):
 
 def test_vectorized_udf_zero_parameter(self):
 from pyspark.sql.functions import pandas_udf
-import pandas as pd
-df = self.spark.range(10)
-f0 = pandas_udf(lambda **kwargs: 
pd.Series(1).repeat(kwargs['length']), LongType())
-res = df.select(f0())
-self.assertEquals(df.select(lit(1)).collect(), res.collect())
+with QuietTest(self.sc):
+with self.assertRaisesRegexp(Exception, '0-parameter 
pandas_udfs.*not.*supported'):
--- End diff --

I believe we could catch narrower one, `NotImplementedError`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140623282
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self):
 from pyspark.sql.functions import pandas_udf, col
 import pandas as pd
 df = self.spark.range(10)
-raise_exception = pandas_udf(lambda: pd.Series(1), LongType())
+raise_exception = pandas_udf(lambda i: pd.Series(1), LongType())
 with QuietTest(self.sc):
 with self.assertRaisesRegexp(
 Exception,
--- End diff --

Here too, while we are here, let's catch narrower exception type. Looks 
`RuntimeError`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140623236
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self):
 from pyspark.sql.functions import pandas_udf, col
 import pandas as pd
 df = self.spark.range(10)
-raise_exception = pandas_udf(lambda: pd.Series(1), LongType())
+raise_exception = pandas_udf(lambda i: pd.Series(1), LongType())
--- End diff --

Maybe `lambda _: ...`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...

2017-09-22 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19295#discussion_r140624191
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---
@@ -28,12 +28,18 @@ class SparkOptimizer(
 experimentalMethods: ExperimentalMethods)
   extends Optimizer(catalog) {
 
-  override def batches: Seq[Batch] = (preOptimizationBatches ++ 
super.batches :+
+  val experimentalPreOptimizations: Seq[Batch] = Seq(Batch(
+"User Provided Pre Optimizers", fixedPoint, 
experimentalMethods.extraPreOptimizations: _*))
+
+  val experimentalPostOptimizations: Batch = Batch(
+"User Provided Post Optimizers", fixedPoint, 
experimentalMethods.extraOptimizations: _*)
+
+  override def batches: Seq[Batch] = experimentalPreOptimizations ++
+(preOptimizationBatches ++ super.batches :+
--- End diff --

This PR is not about Analyzer, please also update your description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19295: [SPARK-22080][SQL] Adds support for allowing user to add...

2017-09-22 Thread wzhfy
Github user wzhfy commented on the issue:

https://github.com/apache/spark/pull/19295
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...

2017-09-22 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19295#discussion_r140624028
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---
@@ -28,12 +28,18 @@ class SparkOptimizer(
 experimentalMethods: ExperimentalMethods)
   extends Optimizer(catalog) {
 
-  override def batches: Seq[Batch] = (preOptimizationBatches ++ 
super.batches :+
+  val experimentalPreOptimizations: Seq[Batch] = Seq(Batch(
--- End diff --

also define this as `Batch` and you can use `experimentalPreOptimizations 
+: preOptimizationBatches` to concatenate with other batches.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...

2017-09-22 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19295#discussion_r140623965
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala ---
@@ -44,11 +44,14 @@ class ExperimentalMethods private[sql]() {
*/
   @volatile var extraStrategies: Seq[Strategy] = Nil
 
+  @volatile var extraPreOptimizations: Seq[Rule[LogicalPlan]] = Nil
+
   @volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil
--- End diff --

how about rename this `extraPostOptimizations`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...

2017-09-22 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19295#discussion_r140623955
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala ---
@@ -28,12 +28,18 @@ class SparkOptimizer(
 experimentalMethods: ExperimentalMethods)
   extends Optimizer(catalog) {
 
-  override def batches: Seq[Batch] = (preOptimizationBatches ++ 
super.batches :+
+  val experimentalPreOptimizations: Seq[Batch] = Seq(Batch(
+"User Provided Pre Optimizers", fixedPoint, 
experimentalMethods.extraPreOptimizations: _*))
+
+  val experimentalPostOptimizations: Batch = Batch(
+"User Provided Post Optimizers", fixedPoint, 
experimentalMethods.extraOptimizations: _*)
+
+  override def batches: Seq[Batch] = experimentalPreOptimizations ++
+(preOptimizationBatches ++ super.batches :+
--- End diff --

OK, I see. Then could you add the use case to PR description? like: 
```
after this PR, we can add both pre/post optimization rules at runtime as 
follows:
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...

2017-09-22 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19295#discussion_r140624052
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala ---
@@ -78,8 +82,14 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
 
   test("Catalyst optimization passes are modifiable at runtime") {
 val sqlContext = SQLContext.getOrCreate(sc)
-sqlContext.experimental.extraOptimizations = Seq(DummyRule)
-
assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
+sqlContext.experimental.extraOptimizations = 
Seq(DummyPostOptimizationRule)
+sqlContext.experimental.extraPreOptimizations = 
Seq(DummyPreOptimizationRule)
+
+val firstBatch = sqlContext.sessionState.optimizer.batches.head
+val lastBatch = sqlContext.sessionState.optimizer.batches.last // 
.flatMap(_.rules)
--- End diff --

is the comment useful?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...

2017-09-22 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19317
  
@jiangxb1987 ,@WeichenXu123, thanks for your reviewing. This change is 
inspired by the `TODO List`. You can see the follow code snippet:
```scala
// TODO: Calling aggregateByKey and collect creates two stages, we can 
implement something
// TODO: similar to reduceByKeyLocally to save one stage.
val aggregated = dataset.select(col($(labelCol)), w, 
col($(featuresCol))).rdd
  .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
  }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
  seqOp = {
 case ((weightSum: Double, featureSum: DenseVector), (weight, 
features)) =>
   requireValues(features)
   BLAS.axpy(weight, features, featureSum)
   (weightSum + weight, featureSum)
  },
  combOp = {
 case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
   BLAS.axpy(1.0, featureSum2, featureSum1)
   (weightSum1 + weightSum2, featureSum1)
  }).collect().sortBy(_._1)

``` 

- The code `aggregateByKeyLocally` we implemented is similar to the 
`reduceByKeyLocally `. 
-  I agree with your suggestion for using `OpenHashSet ` instead of 
`JHashMap`. I could change it, and also the `reduceByKeyLocally` maybe need a 
change to.
- Because here we collect all the aggregated data to driver and sort it. I 
think the data could be small. And the collected data of two implements could 
be almost equally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18936: [SPARK-21688][ML][MLLIB] make native BLAS the first choi...

2017-09-22 Thread VinceShieh
Github user VinceShieh commented on the issue:

https://github.com/apache/spark/pull/18936
  
Hi Sean, sorry for late reply. Yeah, actually we do have some performance 
data on F2J vs. OpenBLAS. It seems there is no performance gain from openblas, 
not even on the unit test level. We are thinking maybe we should setup and test 
on another cluster in case the result is due to the environment-related issues. 
we will sort out the data and post here later.
it's possible that openblas may not give us any benefit in this case, but 
still here are my two cents:
1. like openblas, MKL blas is also free, easy to get and much well 
documented.
2. it's much easier for users to use MKL blas than openblas, just download 
it and put it where the system can find it, no installation required. On the 
other hand, install openblas on each machine is still troublesome, cost us a 
while to get it work.
3. MKL blas is highly optimized thus has much better performance
4. this PR just opens up an opportunity for users to choose different BLAS 
impl, they can still go for F2J, so I think it'd be more user-friendly than 
binding all the level1 blas to F2J.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...

2017-09-22 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19317
  
Yes. I guess the perf gain is because, this PR use local hashmap which can 
use unlimited memory, but current spark aggregation impl, will auto spill local 
hashmap when exceeding a threshold.
Memory management is a difficult thing we should not try to do it in user 
code I think, because we're hard to estimate how huge the hashmap will grow to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...

2017-09-22 Thread VinceShieh
Github user VinceShieh commented on the issue:

https://github.com/apache/spark/pull/19317
  
Nice catch. thanks. the perf gain is truly narrow.
I believe this impl just tried to align with the impl of 
'reduceByKeyLocally'.
@ConeyLiu maybe we should revisit the code, along with the 
'reduceByKeyLocally' impl.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19222
  
**[Test build #82100 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82100/testReport)**
 for PR 19222 at commit 
[`66bfbfc`](https://github.com/apache/spark/commit/66bfbfcacbcd37b911c6de2289352aebd63d52d6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16578
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2017-09-22 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19222
  
Jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16578
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82097/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16578
  
**[Test build #82097 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82097/testReport)**
 for PR 16578 at commit 
[`9fac482`](https://github.com/apache/spark/commit/9fac4826f6289e9e8c02f3799912463bbc8ea046).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140622457
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()):
 :param f: python function if used as a standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
-# TODO: doctest
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+>>> @pandas_udf(returnType=StringType())
--- End diff --

We could just do `  # doctest: +SKIP` maybe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...

2017-09-22 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19317
  
And I have to point out that your impl have high risk causing OOM. The 
current impl will auto spill when local hashmap is too large and can take 
advantage of spark auto memory management mechanism which you'd better take a 
look. 
Another thing is the JHashmap will be slow perf and it is better to use 
`org.apache.spark.util.collection.OpenHashSet`, in the case the hashmap is 
append-only.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...

2017-09-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19325#discussion_r140622410
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()):
 :param f: python function if used as a standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
-# TODO: doctest
+>>> from pyspark.sql.types import IntegerType, StringType
+>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+>>> @pandas_udf(returnType=StringType())
--- End diff --

Have we installed pyarrow on Jenkins? The failed test complains 
`ImportError: No module named pyarrow`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19328
  
Yea .. let's open a separate JIRA.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...

2017-09-22 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19229
  
@viirya Yeah the perf gap I only focus on `mean` which can take advantage 
of codegen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19278


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...

2017-09-22 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19229
  
Yeah, I think that fix should work for the strategy `Imputer.mean` because 
`Imputer.mean` aggregates many columns at once now and that can be a too large 
gen'd code for aggregation.

For the strategy `Imputer.median`, because it uses `approxQuantile` which 
calls rdd's aggregate API, I think codegen doesn't affect this part.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...

2017-09-22 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19278
  
LGTM
Merging with master
Thanks @WeichenXu123 for the fix and for testing for backwards 
compatibility!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-22 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r140621871
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala 
---
@@ -160,11 +160,13 @@ class TrainValidationSplitSuite
   .setTrainRatio(0.5)
   .setEstimatorParamMaps(paramMaps)
   .setSeed(42L)
+  .setParallelism(2)
--- End diff --

Ah, you're right, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

2017-09-22 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/18924#discussion_r140621444
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
+val optimizeDocConcentration = this.optimizeDocConcentration
+// We calculate logphat in the same pass as other statistics, but we 
only need
+// it if we are optimizing docConcentration
+val logphatPartOptionBase = () => if (optimizeDocConcentration) 
Some(BDV.zeros[Double](k))
+  else None
 
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = 
batch.mapPartitions { docs =>
--- End diff --

Let's use Long for the doc count since it could overflow for large datasets 
and miniBatchFraction


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19286: [SPARK-21338][SQL][FOLLOW-UP] Implement isCascadingTrunc...

2017-09-22 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19286
  
ping @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19328
  
I think you should create a new JIRA, instead of using SPARK-22088 which is 
for the wrong style issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19321
  
**[Test build #82099 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82099/testReport)**
 for PR 19321 at commit 
[`db2c110`](https://github.com/apache/spark/commit/db2c11056cf973708b10b5e5fdcd8ec705e27d21).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19325
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19325
  
**[Test build #82096 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82096/testReport)**
 for PR 19325 at commit 
[`7b0da10`](https://github.com/apache/spark/commit/7b0da106fb64a16b77c62953bb12548fda3f7ef3).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19325
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82096/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19328
  
**[Test build #82098 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82098/testReport)**
 for PR 19328 at commit 
[`0f3307d`](https://github.com/apache/spark/commit/0f3307dfc3dd21b3d643d3d58e9202743f958b23).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19328
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...

2017-09-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19328
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19328: [SPARK-22088][SQL][Documentation] Add usage and i...

2017-09-22 Thread kevinyu98
GitHub user kevinyu98 opened a pull request:

https://github.com/apache/spark/pull/19328

[SPARK-22088][SQL][Documentation] Add usage and improve documentation with 
arguments and examples for trim function

## What changes were proposed in this pull request?

This PR proposes to enhance the documentation for `trim` functions in the 
function description session.

- Add more `usage`, `arguments` and `examples` for the trim function
- Adjust space in the `usage` session

After the changes, the trim function documentation will look like this:

 

- `trim`

```trim(str) - Removes the leading and trailing space characters from str.

trim(BOTH trimStr FROM str) - Remove the leading and trailing trimStr 
characters from str

trim(LEADING trimStr FROM str) - Remove the leading trimStr characters from 
str

trim(TRAILING trimStr FROM str) - Remove the trailing trimStr characters 
from str

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single 
space
BOTH, FROM - these are keywords to specify trimming string characters from 
both ends of the string
LEADING, FROM - these are keywords to specify trimming string characters 
from the left end of the string
TRAILING, FROM - these are keywords to specify trimming string characters 
from the right end of the string
Examples:

> SELECT trim('SparkSQL   ');
 SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
 parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
 parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
 parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
 SSparkSQ
```

- `ltrim`

```ltrim

ltrim(str) - Removes the leading space characters from str.

ltrim(trimStr, str) - Removes the leading string contains the characters 
from the trim string

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single 
space
Examples:

> SELECT ltrim('SparkSQL   ');
 SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
 arkSQLS
```

- `rtrim`
```rtrim

rtrim(str) - Removes the trailing space characters from str.

rtrim(trimStr, str) - Removes the trailing string which contains the 
characters from the trim string from the str

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single 
space
Examples:

> SELECT rtrim('SparkSQL   ');
 SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
 SSpark
```

This is the trim characters function jira: [trim 
function](https://issues.apache.org/jira/browse/SPARK-14878)
## How was this patch tested?
Manually tested
```
spark-sql> describe function extended trim;
17/09/22 17:03:04 INFO CodeGenerator: Code generated in 153.026533 ms
Function: trim
Class: org.apache.spark.sql.catalyst.expressions.StringTrim
Usage: 
trim(str) - Removes the leading and trailing space characters from 
`str`.

trim(BOTH trimStr FROM str) - Remove the leading and trailing `trimStr` 
characters from `str`

trim(LEADING trimStr FROM str) - Remove the leading `trimStr` 
characters from `str`

trim(TRAILING trimStr FROM str) - Remove the trailing `trimStr` 
characters from `str`
  
Extended Usage:
Arguments:
  * str - a string expression
  * trimStr - the trim string characters to trim, the default value is 
a single space
  * BOTH, FROM - these are keywords to specify trimming string 
characters from both ends of
  the string
  * LEADING, FROM - these are keywords to specify trimming string 
characters from the left
  end of the string
  * TRAILING, FROM - these are keywords to specify trimming string 
characters from the right
  end of the string
  
Examples:
  > SELECT trim('SparkSQL   ');
   SparkSQL
  > SELECT trim('SL', 'SSparkSQLS');
   parkSQ
  > SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
   parkSQ
  > SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
   parkSQLS
  > SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
   SSparkSQ
  ```
```
spark-sql> describe function extended ltrim;
Function: ltrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimLeft
Usage: 
ltrim(str) - Removes the leading space characters from `str`.

ltrim(trimStr, str) - Removes the leading string contains the 
characters from the trim string
  
Extended Usage:
Arguments:
  * str - a string expression
  * trimStr - the trim string characters to trim, the default 

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140605142
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -146,7 +146,13 @@ case class StreamingSymmetricHashJoinExec(
   stateWatermarkPredicates = JoinStateWatermarkPredicates(), left, 
right)
   }
 
-  require(joinType == Inner, s"${getClass.getSimpleName} should not take 
$joinType as the JoinType")
+  private lazy val badJoinTypeException =
--- End diff --

This should be a def. I dont think exceptions should be created and thrown 
later. I am not sure whether it will capture the stack trace of when it was 
created or when it was thrown. Either way its a bad pattern to have. A better 
approach is to make it into a function `def throwBadJoinTypeException() { throw 
new ... }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140608418
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
--- End diff --

current -> currentValue


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140615004
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
--- End diff --

This loop can probably be split into two methods `findNextValueToRemove`, 
and `removeValue`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140616618
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  leftSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
rightSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(value).withRight(nullRight) }
+  case RightOuter =>
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  rightSideJoiner
--- End diff --

nit: split this into two statements with an intermediate variable 
`removedRowsIter`. and docs on what this does.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140614146
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
--- End diff --

maybe rename`valueForIndex -> currentValue` and `index -> 
nextValueIndex `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140605854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
-/** Remove old buffered state rows using watermarks for state keys and 
values */
-def removeOldState(): Unit = {
+/**
+ * Builds an iterator over old state key-value pairs, removing them 
lazily as they're produced.
+ *
+ * This iterator is dangerous! It must be consumed fully before any 
other operations are made
--- End diff --

Maybe tone down the language is a little :) 
Rather than saying "dangerous", uses `@note`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140617927
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  leftSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
rightSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(value).withRight(nullRight) }
+  case RightOuter =>
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  rightSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
leftSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(nullLeft).withRight(value) }
+  case _ => throw badJoinTypeException
+}
+
+val outputIterWithMetrics = outputIter.map { row =>
+  numOutputRows += 1
+  row
+}
+
+// Iterator which must be consumed after output completion before 
committing.
+val cleanupIter = joinType match {
+  case Inner =>
+leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
+  case LeftOuter => rightSideJoiner.removeOldState()
+  case RightOuter => leftSideJoiner.removeOldState()
+  case _ => throw badJoinTypeException
+}
 
 // Function to remove old state after all the input has been consumed 
and output generated
 def onOutputCompletion = {
   allUpdatesTimeMs += math.max(NANOSECONDS.toMillis(System.nanoTime - 
updateStartTimeNs), 0)
 
-  // Remove old state if needed
+  // TODO: how to get this for removals as part of outer join?
   allRemovalsTimeMs += timeTakenMs {
-leftSideJoiner.removeOldState()
-rightSideJoiner.removeOldState()
+cleanupIter.foreach(_ => ())
--- End diff --

dont use foreach. scala's foreach is pretty inefficient. use while loop.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140614325
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
+  if (index < numValues) {
--- End diff --

add comments for each conditional branch to explain this. 
in retrospect, probably I should have done that too :) 
but now the code is an order of magnitude more complicated now, so requires 
more docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140608463
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
--- End diff --

dont use tuples.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140612077
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -329,6 +392,27 @@ class SymmetricHashJoinStateManager(
   }
 }
 
+/** Get all the values for key and all indices, in a (value, index) 
tuple. */
+def getAllWithIndex(key: UnsafeRow, numValues: Long): 
Iterator[(UnsafeRow, Long)] = {
--- End diff --

We can probably convert getAll to this. Does not make sense to have both, 
especially both are equally efficient if you return 
Iterator[KeyWithIndexAndValue].

Also, I think the iterator() method can be removed. its effectively not 
being used (used only in StateManager.iterator, which is not being used really).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140607924
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
--- End diff --

Use UnsafeRowPair for return. Same reason as above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140611178
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -329,6 +392,27 @@ class SymmetricHashJoinStateManager(
   }
 }
 
+/** Get all the values for key and all indices, in a (value, index) 
tuple. */
+def getAllWithIndex(key: UnsafeRow, numValues: Long): 
Iterator[(UnsafeRow, Long)] = {
--- End diff --

use `KeyWithIndexAndValue` for returning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140612202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
--- End diff --

its not really cleanup, maybe "update"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140605588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
-/** Remove old buffered state rows using watermarks for state keys and 
values */
-def removeOldState(): Unit = {
+/**
+ * Builds an iterator over old state key-value pairs, removing them 
lazily as they're produced.
+ *
+ * This iterator is dangerous! It must be consumed fully before any 
other operations are made
+ * against this joiner's join state manager, and in particular commits 
must not happen while
+ * this iterator is ongoing. The intermediate states of the iterator 
leave the state manager in
+ * an invalid configuration.
+ *
+ * We do this unsafe thing to avoid requiring either two passes or 
full materialization when
+ * processing the rows for outer join.
+ */
+def removeOldState(): Iterator[(UnsafeRow, UnsafeRow)] = {
--- End diff --

Use `UnsafeRowPair` instead of Tuple2 (i.e. () is shorthand for 
scala.Tuple2). It reuses the tuple (or equivalent object. Avoid creation of a 
lot of short term objects, thus reducing GC pressure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140614816
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
+  if (index < numValues) {
+val current = keyWithIndexToValue.get(currentKey, index)
+if (condition(current)) {
+  valueForIndex = current
+} else {
+  index += 1
+}
+  } else {
+cleanupCurrentKey()
+
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+numValues = currentKeyToNumValue.get.numValue
--- End diff --

numValues -> currentKeyNumValues


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140617841
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  leftSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
rightSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(value).withRight(nullRight) }
+  case RightOuter =>
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  rightSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
leftSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(nullLeft).withRight(value) }
+  case _ => throw badJoinTypeException
+}
+
+val outputIterWithMetrics = outputIter.map { row =>
+  numOutputRows += 1
+  row
+}
+
+// Iterator which must be consumed after output completion before 
committing.
+val cleanupIter = joinType match {
+  case Inner =>
+leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
+  case LeftOuter => rightSideJoiner.removeOldState()
+  case RightOuter => leftSideJoiner.removeOldState()
+  case _ => throw badJoinTypeException
+}
--- End diff --

This confused me a lot but then I got why you need call removeOldState once 
again. Can you add the explanation that you have to clean the side that has not 
been cleaned.

Also, this can be moved into the "onOutputCompletion" because that is where 
this is needed, not before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >