[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19321 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19321 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82101/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19321 **[Test build #82101 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82101/testReport)** for PR 19321 at commit [`45e655f`](https://github.com/apache/spark/commit/45e655f14f9beed6bc6bc584b73619ba46a51c43). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19329: [SPARK-22110][SQL][Documentation] Add usage and improve ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19329 **[Test build #82105 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82105/testReport)** for PR 19329 at commit [`0f3307d`](https://github.com/apache/spark/commit/0f3307dfc3dd21b3d643d3d58e9202743f958b23). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #9207: [SPARK-11171][SPARK-11237][SPARK-11241][ML] Try adding PM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/9207 Build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19329: [SPARK-22110][SQL][Documentation] Add usage and improve ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19329 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #9207: [SPARK-11171][SPARK-11237][SPARK-11241][ML] Try adding PM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/9207 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82102/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #9207: [SPARK-11171][SPARK-11237][SPARK-11241][ML] Try adding PM...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/9207 **[Test build #82102 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82102/consoleFull)** for PR 9207 at commit [`9cb8994`](https://github.com/apache/spark/commit/9cb899443473eceaad459c5fb550b7a8f3780a9c). * This patch **fails Spark unit tests**. * This patch **does not merge cleanly**. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626955 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.map(new ArrowPayload(_)), context) // Verify that the output schema is correct -val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex - .map { case (attr, i) => attr.withName(s"_$i") }) -assert(schemaOut.equals(outputRowIterator.schema), - s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") +if (outputIterator.nonEmpty) { + val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex +.map { case (attr, i) => attr.withName(s"_$i") }) + assert(schemaOut.equals(outputRowIterator.schema), +s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") --- End diff -- Looks like we don't have a test against this case. We should add a test for invalid schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626292 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.map(new ArrowPayload(_)), context) // Verify that the output schema is correct -val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex - .map { case (attr, i) => attr.withName(s"_$i") }) -assert(schemaOut.equals(outputRowIterator.schema), - s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") +if (outputIterator.nonEmpty) { --- End diff -- I guess we should use `outputRowIterator`. Btw how about using `hasNext` instead of `nonEmpty`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19325 Most looks pretty good. Only main question I have is about the empty partition issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626184 --- Diff: python/pyspark/sql/tests.py --- @@ -3344,6 +3342,22 @@ def test_vectorized_udf_wrong_return_type(self): 'Invalid.*type.*string'): df.select(f(col('x'))).collect() +def test_vectorized_udf_decorator(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.range(10) + +@pandas_udf(returnType=LongType()) +def identity(x): +return x +res = df.select(identity(col('id'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_empty_partition(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) --- End diff -- Oh. I see. One partition is empty and it is related to the added stuff in `ArrowEvalPythonExec`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626154 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -51,10 +51,12 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.map(new ArrowPayload(_)), context) // Verify that the output schema is correct -val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex - .map { case (attr, i) => attr.withName(s"_$i") }) -assert(schemaOut.equals(outputRowIterator.schema), - s"Invalid schema from pandas_udf: expected $schemaOut, got ${outputRowIterator.schema}") +if (outputIterator.nonEmpty) { --- End diff -- After `outputIterator` is consumed by `ArrowConverters`, can `nonEmpty` return a meaningful value? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140626051 --- Diff: python/pyspark/worker.py --- @@ -80,14 +77,12 @@ def wrap_pandas_udf(f, return_type): arrow_return_type = toArrowType(return_type) def verify_result_length(*a): -kwargs = a[-1] -result = f(*a[:-1], **kwargs) -if len(result) != kwargs["length"]: +result = f(*a) +if len(result) != len(a[0]): --- End diff -- Should we verify the returned is a Pandas.Series? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140625992 --- Diff: python/pyspark/sql/tests.py --- @@ -3344,6 +3342,22 @@ def test_vectorized_udf_wrong_return_type(self): 'Invalid.*type.*string'): df.select(f(col('x'))).collect() +def test_vectorized_udf_decorator(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.range(10) + +@pandas_udf(returnType=LongType()) +def identity(x): +return x +res = df.select(identity(col('id'))) +self.assertEquals(df.collect(), res.collect()) + +def test_vectorized_udf_empty_partition(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) --- End diff -- Maybe I miss something, but what this test is intended to test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19329: [SPARK-22110][SQL][Documentation] Add usage and improve ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19329 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19329: [SPARK-22110][SQL][Documentation] Add usage and i...
GitHub user kevinyu98 opened a pull request: https://github.com/apache/spark/pull/19329 [SPARK-22110][SQL][Documentation] Add usage and improve documentation with arguments and examples for trim function ## What changes were proposed in this pull request? This PR proposes to enhance the documentation for `trim` functions in the function description session. - Add more `usage`, `arguments` and `examples` for the trim function - Adjust space in the `usage` session After the changes, the trim function documentation will look like this: - `trim` ```trim(str) - Removes the leading and trailing space characters from str. trim(BOTH trimStr FROM str) - Remove the leading and trailing trimStr characters from str trim(LEADING trimStr FROM str) - Remove the leading trimStr characters from str trim(TRAILING trimStr FROM str) - Remove the trailing trimStr characters from str Arguments: str - a string expression trimStr - the trim string characters to trim, the default value is a single space BOTH, FROM - these are keywords to specify trimming string characters from both ends of the string LEADING, FROM - these are keywords to specify trimming string characters from the left end of the string TRAILING, FROM - these are keywords to specify trimming string characters from the right end of the string Examples: > SELECT trim('SparkSQL '); SparkSQL > SELECT trim('SL', 'SSparkSQLS'); parkSQ > SELECT trim(BOTH 'SL' FROM 'SSparkSQLS'); parkSQ > SELECT trim(LEADING 'SL' FROM 'SSparkSQLS'); parkSQLS > SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS'); SSparkSQ ``` - `ltrim` ```ltrim ltrim(str) - Removes the leading space characters from str. ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string Arguments: str - a string expression trimStr - the trim string characters to trim, the default value is a single space Examples: > SELECT ltrim('SparkSQL '); SparkSQL > SELECT ltrim('Sp', 'SSparkSQLS'); arkSQLS ``` - `rtrim` ```rtrim rtrim(str) - Removes the trailing space characters from str. rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the str Arguments: str - a string expression trimStr - the trim string characters to trim, the default value is a single space Examples: > SELECT rtrim('SparkSQL '); SparkSQL > SELECT rtrim('LQSa', 'SSparkSQLS'); SSpark ``` This is the trim characters function jira: [trim function](https://issues.apache.org/jira/browse/SPARK-14878) ## How was this patch tested? Manually tested ``` spark-sql> describe function extended trim; 17/09/22 17:03:04 INFO CodeGenerator: Code generated in 153.026533 ms Function: trim Class: org.apache.spark.sql.catalyst.expressions.StringTrim Usage: trim(str) - Removes the leading and trailing space characters from `str`. trim(BOTH trimStr FROM str) - Remove the leading and trailing `trimStr` characters from `str` trim(LEADING trimStr FROM str) - Remove the leading `trimStr` characters from `str` trim(TRAILING trimStr FROM str) - Remove the trailing `trimStr` characters from `str` Extended Usage: Arguments: * str - a string expression * trimStr - the trim string characters to trim, the default value is a single space * BOTH, FROM - these are keywords to specify trimming string characters from both ends of the string * LEADING, FROM - these are keywords to specify trimming string characters from the left end of the string * TRAILING, FROM - these are keywords to specify trimming string characters from the right end of the string Examples: > SELECT trim('SparkSQL '); SparkSQL > SELECT trim('SL', 'SSparkSQLS'); parkSQ > SELECT trim(BOTH 'SL' FROM 'SSparkSQLS'); parkSQ > SELECT trim(LEADING 'SL' FROM 'SSparkSQLS'); parkSQLS > SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS'); SSparkSQ ``` ``` spark-sql> describe function extended ltrim; Function: ltrim Class: org.apache.spark.sql.catalyst.expressions.StringTrimLeft Usage: ltrim(str) - Removes the leading space characters from `str`. ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string Extended Usage: Arguments: * str - a string expression * trimStr - the trim string characters to trim, the
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/13599 **[Test build #82104 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82104/testReport)** for PR 13599 at commit [`abdf7b7`](https://github.com/apache/spark/commit/abdf7b7a8a75dfc7b8de597611bbfa0af126e24e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19328 Why you close it? You can just edit the PR title. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19328: [SPARK-22088][SQL][Documentation] Add usage and i...
Github user kevinyu98 closed the pull request at: https://github.com/apache/spark/pull/19328 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user kevinyu98 commented on the issue: https://github.com/apache/spark/pull/19328 I am so sorry that I made mistake on the jira number, I create a new jira SPARK-22110, but I used the wrong number, let me close this PR, then put correct jira number. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19317 Oh I get your point. This is different from `RDD.aggregate`, it directly return Map and avoid shuffling. it seems useful when numKeys is small. But, I check the final `reduce` step, it seems can be optimized using `treeAggregate`, and we can add a `depth` parameter. And using `OpenHashSet` instead of `JHashMap` looks better, but we need test first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82100/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19222 **[Test build #82100 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82100/testReport)** for PR 19222 at commit [`66bfbfc`](https://github.com/apache/spark/commit/66bfbfcacbcd37b911c6de2289352aebd63d52d6). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13143: [SPARK-15359] [Mesos] Mesos dispatcher should handle DRI...
Github user ArtRand commented on the issue: https://github.com/apache/spark/pull/13143 Hello @devaraj-kavali. Yes. I've been playing around with this because it's inconvenient to clean up ZK whenever you uninstall/reinstall the Dispatcher. The problem is that the only signal of a re-install vs. a failover is when Mesos gives you a `DRIVER_ABORTED` error (signals a re-install). I think a solution will involve having different methods to starting `SchedulerDrivers` for Spark Drivers (applications) and the Dispatcher because they inherently have different requirements with respect to state. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19320: [SPARK-22099] The 'job ids' list style needs to be chang...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19320 **[Test build #82103 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82103/testReport)** for PR 19320 at commit [`5cb6ea4`](https://github.com/apache/spark/commit/5cb6ea405755fd70ca6f2f7078914cc9dece8c73). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19320: [SPARK-22099] The 'job ids' list style needs to be chang...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19320 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140625136 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +de: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +Executors.newSingleThreadScheduledExecutor( + ThreadUtils.namedThreadFactory("Credential Refresh Thread")) + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get("spark.yarn.principal") + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab64 = conf.get("spark.yarn.keytab", null) +val tgt64 = System.getenv("KRB5CCNAME") +require(keytab64 != null || tgt64 != null, "keytab or tgt required") +require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") +val mode = if (keytab64 != null) "keytab" else "tgt" +val secretFile = if (keytab64 != null) keytab64 else tgt64 +logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour + logWarning("Couldn't broadcast tokens, trying again in an hour", e) + credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) + return + } + scheduleRenewal(this) +} + } +scheduleRenewal(credentialRenewerRunnable) + } + + private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { +logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") +// Get new delegation tokens by logging in with a new UGI +// inspired by AMCredentialRenewer.scala:L174 +val ugi = if (mode == "keytab") { --- End diff -- Hello @kalvinnchau You are correct, all this does is keep track of when the tokens will expire and renew them at that time. Part of my motivation for doing this is to avoid writing any files to disk (like new
[GitHub] spark issue #9207: [SPARK-11171][SPARK-11237][SPARK-11241][ML] Try adding PM...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/9207 **[Test build #82102 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82102/consoleFull)** for PR 9207 at commit [`9cb8994`](https://github.com/apache/spark/commit/9cb899443473eceaad459c5fb550b7a8f3780a9c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19326: [SPARK-22107] Change as to alias in python quickstart
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19326 How does it relate to https://github.com/apache/spark/pull/19283? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19303: [SPARK-22085][CORE]When the application has no co...
Github user 10110346 closed the pull request at: https://github.com/apache/spark/pull/19303 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19328 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82098/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19328 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19328 **[Test build #82098 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82098/testReport)** for PR 19328 at commit [`0f3307d`](https://github.com/apache/spark/commit/0f3307dfc3dd21b3d643d3d58e9202743f958b23). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19321 **[Test build #82101 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82101/testReport)** for PR 19321 at commit [`45e655f`](https://github.com/apache/spark/commit/45e655f14f9beed6bc6bc584b73619ba46a51c43). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19295: [SPARK-22080][SQL] Adds support for allowing user to add...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19295 ping @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19321 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82099/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19321 **[Test build #82099 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82099/testReport)** for PR 19321 at commit [`db2c110`](https://github.com/apache/spark/commit/db2c11056cf973708b10b5e5fdcd8ec705e27d21). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19321 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19295: [SPARK-22080][SQL] Adds support for allowing user to add...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19295 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19325 To me, I am willing to merge this one soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140622826 --- Diff: python/pyspark/serializers.py --- @@ -246,15 +243,9 @@ def cast_series(s, t): def loads(self, obj): """ Deserialize an ArrowRecordBatch to an Arrow table and return as a list of pandas.Series --- End diff -- Ugh, this bugged me. `.` at the end .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623893 --- Diff: python/pyspark/worker.py --- @@ -80,14 +77,12 @@ def wrap_pandas_udf(f, return_type): arrow_return_type = toArrowType(return_type) def verify_result_length(*a): -kwargs = a[-1] -result = f(*a[:-1], **kwargs) -if len(result) != kwargs["length"]: +result = f(*a) +if len(result) != len(a[0]): --- End diff -- I guess we are not guaranteed to have `__len__` in `result`, e.g., `pandas_udf(lambda x: 1, LongType())`. Probably, checking this attribute ahead should be done ahead, while we are here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623560 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() ++--+--++ +|slen(name)|to_upper(name)|add_one(age)| ++--+--++ +| 8| JOHN DOE| 22| ++--+--++ """ +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) import inspect -# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder -if inspect.getargspec(f).keywords is None: -return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) -else: -return _create_udf(f, returnType=returnType, vectorized=True) +if not inspect.getargspec(wrapped_udf.func).args: --- End diff -- This is totally a personal preference based on my little experience. I usually avoid to use `if not something` expression .. because it confuses of the expected type, for example, this can be `None`, `0` or 0-length of list or tuples because it coerces this to a bool. To me, I usually do `is not None` or `len(..) > 0`. I am fine as is too (because I think it's a personal preference) but just wanted to leave a side note (and change it if this could persuade you too). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140624294 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) +... def to_upper(s): +... return s.str.upper() +... +>>> @pandas_udf(returnType="integer") +... def add_one(x): +... return x + 1 +... +>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) +>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show() ++--+--++ +|slen(name)|to_upper(name)|add_one(age)| ++--+--++ +| 8| JOHN DOE| 22| ++--+--++ """ +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) import inspect -# If function "f" does not define the optional kwargs, then wrap with a kwargs placeholder -if inspect.getargspec(f).keywords is None: -return _create_udf(lambda *a, **kwargs: f(*a), returnType=returnType, vectorized=True) -else: -return _create_udf(f, returnType=returnType, vectorized=True) +if not inspect.getargspec(wrapped_udf.func).args: --- End diff -- It looks `wrapped_udf.func` could be `_udf` within `_create_udf`, that takes a single argument, for example: ```python @pandas_udf(returnType=LongType()) def add_one(): return 1 ``` I tried a rough idea to solve this: ```diff --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2124,11 +2124,14 @@ class UserDefinedFunction(object): return wrapper -def _create_udf(f, returnType, vectorized): +def _create_udf(f, returnType, vectorized, checker=None): def _udf(f, returnType=StringType(), vectorized=vectorized): udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized) -return udf_obj._wrapped() +wrapped = udf_obj._wrapped() +if checker is not None: +checker(wrapped) +return wrapped # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf if f is None or isinstance(f, (str, DataType)): @@ -2201,10 +2204,14 @@ def pandas_udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True) -import inspect -if not inspect.getargspec(wrapped_udf.func).args: -raise NotImplementedError("0-parameter pandas_udfs are not currently supported") + +def checker(wrapped): +import inspect +if not inspect.getargspec(wrapped.func).args: +raise NotImplementedError("0-parameter pandas_udfs are not currently supported") + +wrapped_udf = _create_udf(f, returnType=returnType, vectorized=True, checker=checker) + return wrapped_udf ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623030 --- Diff: python/pyspark/sql/tests.py --- @@ -3256,11 +3256,9 @@ def test_vectorized_udf_null_string(self): def test_vectorized_udf_zero_parameter(self): from pyspark.sql.functions import pandas_udf -import pandas as pd -df = self.spark.range(10) -f0 = pandas_udf(lambda **kwargs: pd.Series(1).repeat(kwargs['length']), LongType()) -res = df.select(f0()) -self.assertEquals(df.select(lit(1)).collect(), res.collect()) +with QuietTest(self.sc): +with self.assertRaisesRegexp(Exception, '0-parameter pandas_udfs.*not.*supported'): --- End diff -- I believe we could catch narrower one, `NotImplementedError`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623282 --- Diff: python/pyspark/sql/tests.py --- @@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self): from pyspark.sql.functions import pandas_udf, col import pandas as pd df = self.spark.range(10) -raise_exception = pandas_udf(lambda: pd.Series(1), LongType()) +raise_exception = pandas_udf(lambda i: pd.Series(1), LongType()) with QuietTest(self.sc): with self.assertRaisesRegexp( Exception, --- End diff -- Here too, while we are here, let's catch narrower exception type. Looks `RuntimeError`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140623236 --- Diff: python/pyspark/sql/tests.py --- @@ -3308,12 +3306,12 @@ def test_vectorized_udf_invalid_length(self): from pyspark.sql.functions import pandas_udf, col import pandas as pd df = self.spark.range(10) -raise_exception = pandas_udf(lambda: pd.Series(1), LongType()) +raise_exception = pandas_udf(lambda i: pd.Series(1), LongType()) --- End diff -- Maybe `lambda _: ...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19295#discussion_r140624191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala --- @@ -28,12 +28,18 @@ class SparkOptimizer( experimentalMethods: ExperimentalMethods) extends Optimizer(catalog) { - override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+ + val experimentalPreOptimizations: Seq[Batch] = Seq(Batch( +"User Provided Pre Optimizers", fixedPoint, experimentalMethods.extraPreOptimizations: _*)) + + val experimentalPostOptimizations: Batch = Batch( +"User Provided Post Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) + + override def batches: Seq[Batch] = experimentalPreOptimizations ++ +(preOptimizationBatches ++ super.batches :+ --- End diff -- This PR is not about Analyzer, please also update your description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19295: [SPARK-22080][SQL] Adds support for allowing user to add...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/19295 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19295#discussion_r140624028 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala --- @@ -28,12 +28,18 @@ class SparkOptimizer( experimentalMethods: ExperimentalMethods) extends Optimizer(catalog) { - override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+ + val experimentalPreOptimizations: Seq[Batch] = Seq(Batch( --- End diff -- also define this as `Batch` and you can use `experimentalPreOptimizations +: preOptimizationBatches` to concatenate with other batches. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19295#discussion_r140623965 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala --- @@ -44,11 +44,14 @@ class ExperimentalMethods private[sql]() { */ @volatile var extraStrategies: Seq[Strategy] = Nil + @volatile var extraPreOptimizations: Seq[Rule[LogicalPlan]] = Nil + @volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil --- End diff -- how about rename this `extraPostOptimizations`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19295#discussion_r140623955 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala --- @@ -28,12 +28,18 @@ class SparkOptimizer( experimentalMethods: ExperimentalMethods) extends Optimizer(catalog) { - override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+ + val experimentalPreOptimizations: Seq[Batch] = Seq(Batch( +"User Provided Pre Optimizers", fixedPoint, experimentalMethods.extraPreOptimizations: _*)) + + val experimentalPostOptimizations: Batch = Batch( +"User Provided Post Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*) + + override def batches: Seq[Batch] = experimentalPreOptimizations ++ +(preOptimizationBatches ++ super.batches :+ --- End diff -- OK, I see. Then could you add the use case to PR description? like: ``` after this PR, we can add both pre/post optimization rules at runtime as follows: ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19295: [SPARK-22080][SQL] Adds support for allowing user...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19295#discussion_r140624052 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala --- @@ -78,8 +82,14 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { test("Catalyst optimization passes are modifiable at runtime") { val sqlContext = SQLContext.getOrCreate(sc) -sqlContext.experimental.extraOptimizations = Seq(DummyRule) - assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule)) +sqlContext.experimental.extraOptimizations = Seq(DummyPostOptimizationRule) +sqlContext.experimental.extraPreOptimizations = Seq(DummyPreOptimizationRule) + +val firstBatch = sqlContext.sessionState.optimizer.batches.head +val lastBatch = sqlContext.sessionState.optimizer.batches.last // .flatMap(_.rules) --- End diff -- is the comment useful? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...
Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/19317 @jiangxb1987 ,@WeichenXu123, thanks for your reviewing. This change is inspired by the `TODO List`. You can see the follow code snippet: ```scala // TODO: Calling aggregateByKey and collect creates two stages, we can implement something // TODO: similar to reduceByKeyLocally to save one stage. val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( seqOp = { case ((weightSum: Double, featureSum: DenseVector), (weight, features)) => requireValues(features) BLAS.axpy(weight, features, featureSum) (weightSum + weight, featureSum) }, combOp = { case ((weightSum1, featureSum1), (weightSum2, featureSum2)) => BLAS.axpy(1.0, featureSum2, featureSum1) (weightSum1 + weightSum2, featureSum1) }).collect().sortBy(_._1) ``` - The code `aggregateByKeyLocally` we implemented is similar to the `reduceByKeyLocally `. - I agree with your suggestion for using `OpenHashSet ` instead of `JHashMap`. I could change it, and also the `reduceByKeyLocally` maybe need a change to. - Because here we collect all the aggregated data to driver and sort it. I think the data could be small. And the collected data of two implements could be almost equally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18936: [SPARK-21688][ML][MLLIB] make native BLAS the first choi...
Github user VinceShieh commented on the issue: https://github.com/apache/spark/pull/18936 Hi Sean, sorry for late reply. Yeah, actually we do have some performance data on F2J vs. OpenBLAS. It seems there is no performance gain from openblas, not even on the unit test level. We are thinking maybe we should setup and test on another cluster in case the result is due to the environment-related issues. we will sort out the data and post here later. it's possible that openblas may not give us any benefit in this case, but still here are my two cents: 1. like openblas, MKL blas is also free, easy to get and much well documented. 2. it's much easier for users to use MKL blas than openblas, just download it and put it where the system can find it, no installation required. On the other hand, install openblas on each machine is still troublesome, cost us a while to get it work. 3. MKL blas is highly optimized thus has much better performance 4. this PR just opens up an opportunity for users to choose different BLAS impl, they can still go for F2J, so I think it'd be more user-friendly than binding all the level1 blas to F2J. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19317 Yes. I guess the perf gain is because, this PR use local hashmap which can use unlimited memory, but current spark aggregation impl, will auto spill local hashmap when exceeding a threshold. Memory management is a difficult thing we should not try to do it in user code I think, because we're hard to estimate how huge the hashmap will grow to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...
Github user VinceShieh commented on the issue: https://github.com/apache/spark/pull/19317 Nice catch. thanks. the perf gain is truly narrow. I believe this impl just tried to align with the impl of 'reduceByKeyLocally'. @ConeyLiu maybe we should revisit the code, along with the 'reduceByKeyLocally' impl. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19222 **[Test build #82100 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82100/testReport)** for PR 19222 at commit [`66bfbfc`](https://github.com/apache/spark/commit/66bfbfcacbcd37b911c6de2289352aebd63d52d6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16578 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19222 Jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16578 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82097/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16578 **[Test build #82097 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82097/testReport)** for PR 16578 at commit [`9fac482`](https://github.com/apache/spark/commit/9fac4826f6289e9e8c02f3799912463bbc8ea046). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140622457 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- We could just do ` # doctest: +SKIP` maybe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19317: [SPARK-22098][CORE] Add new method aggregateByKeyLocally...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19317 And I have to point out that your impl have high risk causing OOM. The current impl will auto spill when local hashmap is too large and can take advantage of spark auto memory management mechanism which you'd better take a look. Another thing is the JHashmap will be slow perf and it is better to use `org.apache.spark.util.collection.OpenHashSet`, in the case the hashmap is append-only. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter p...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19325#discussion_r140622410 --- Diff: python/pyspark/sql/functions.py --- @@ -2183,14 +2183,29 @@ def pandas_udf(f=None, returnType=StringType()): :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object -# TODO: doctest +>>> from pyspark.sql.types import IntegerType, StringType +>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) +>>> @pandas_udf(returnType=StringType()) --- End diff -- Have we installed pyarrow on Jenkins? The failed test complains `ImportError: No module named pyarrow`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19328 Yea .. let's open a separate JIRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19229 @viirya Yeah the perf gap I only focus on `mean` which can take advantage of codegen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19278 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19229: [SPARK-22001][ML][SQL] ImputerModel can do withColumn fo...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19229 Yeah, I think that fix should work for the strategy `Imputer.mean` because `Imputer.mean` aggregates many columns at once now and that can be a too large gen'd code for aggregation. For the strategy `Imputer.median`, because it uses `approxQuantile` which calls rdd's aggregate API, I think codegen doesn't affect this part. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSpli...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19278 LGTM Merging with master Thanks @WeichenXu123 for the fix and for testing for backwards compatibility! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r140621871 --- Diff: mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala --- @@ -160,11 +160,13 @@ class TrainValidationSplitSuite .setTrainRatio(0.5) .setEstimatorParamMaps(paramMaps) .setSeed(42L) + .setParallelism(2) --- End diff -- Ah, you're right, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140621444 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape +val optimizeDocConcentration = this.optimizeDocConcentration +// We calculate logphat in the same pass as other statistics, but we only need +// it if we are optimizing docConcentration +val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) + else None -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions { docs => --- End diff -- Let's use Long for the doc count since it could overflow for large datasets and miniBatchFraction --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19286: [SPARK-21338][SQL][FOLLOW-UP] Implement isCascadingTrunc...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19286 ping @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19328 I think you should create a new JIRA, instead of using SPARK-22088 which is for the wrong style issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19321: [SPARK-22100] [SQL] Make percentile_approx support numer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19321 **[Test build #82099 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82099/testReport)** for PR 19321 at commit [`db2c110`](https://github.com/apache/spark/commit/db2c11056cf973708b10b5e5fdcd8ec705e27d21). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19325 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19325 **[Test build #82096 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82096/testReport)** for PR 19325 at commit [`7b0da10`](https://github.com/apache/spark/commit/7b0da106fb64a16b77c62953bb12548fda3f7ef3). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19325: [SPARK-22106][PYSPARK][SQL] Disable 0-parameter pandas_u...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19325 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82096/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19328 **[Test build #82098 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82098/testReport)** for PR 19328 at commit [`0f3307d`](https://github.com/apache/spark/commit/0f3307dfc3dd21b3d643d3d58e9202743f958b23). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19328 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19328: [SPARK-22088][SQL][Documentation] Add usage and improve ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19328 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19328: [SPARK-22088][SQL][Documentation] Add usage and i...
GitHub user kevinyu98 opened a pull request: https://github.com/apache/spark/pull/19328 [SPARK-22088][SQL][Documentation] Add usage and improve documentation with arguments and examples for trim function ## What changes were proposed in this pull request? This PR proposes to enhance the documentation for `trim` functions in the function description session. - Add more `usage`, `arguments` and `examples` for the trim function - Adjust space in the `usage` session After the changes, the trim function documentation will look like this: - `trim` ```trim(str) - Removes the leading and trailing space characters from str. trim(BOTH trimStr FROM str) - Remove the leading and trailing trimStr characters from str trim(LEADING trimStr FROM str) - Remove the leading trimStr characters from str trim(TRAILING trimStr FROM str) - Remove the trailing trimStr characters from str Arguments: str - a string expression trimStr - the trim string characters to trim, the default value is a single space BOTH, FROM - these are keywords to specify trimming string characters from both ends of the string LEADING, FROM - these are keywords to specify trimming string characters from the left end of the string TRAILING, FROM - these are keywords to specify trimming string characters from the right end of the string Examples: > SELECT trim('SparkSQL '); SparkSQL > SELECT trim('SL', 'SSparkSQLS'); parkSQ > SELECT trim(BOTH 'SL' FROM 'SSparkSQLS'); parkSQ > SELECT trim(LEADING 'SL' FROM 'SSparkSQLS'); parkSQLS > SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS'); SSparkSQ ``` - `ltrim` ```ltrim ltrim(str) - Removes the leading space characters from str. ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string Arguments: str - a string expression trimStr - the trim string characters to trim, the default value is a single space Examples: > SELECT ltrim('SparkSQL '); SparkSQL > SELECT ltrim('Sp', 'SSparkSQLS'); arkSQLS ``` - `rtrim` ```rtrim rtrim(str) - Removes the trailing space characters from str. rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the str Arguments: str - a string expression trimStr - the trim string characters to trim, the default value is a single space Examples: > SELECT rtrim('SparkSQL '); SparkSQL > SELECT rtrim('LQSa', 'SSparkSQLS'); SSpark ``` This is the trim characters function jira: [trim function](https://issues.apache.org/jira/browse/SPARK-14878) ## How was this patch tested? Manually tested ``` spark-sql> describe function extended trim; 17/09/22 17:03:04 INFO CodeGenerator: Code generated in 153.026533 ms Function: trim Class: org.apache.spark.sql.catalyst.expressions.StringTrim Usage: trim(str) - Removes the leading and trailing space characters from `str`. trim(BOTH trimStr FROM str) - Remove the leading and trailing `trimStr` characters from `str` trim(LEADING trimStr FROM str) - Remove the leading `trimStr` characters from `str` trim(TRAILING trimStr FROM str) - Remove the trailing `trimStr` characters from `str` Extended Usage: Arguments: * str - a string expression * trimStr - the trim string characters to trim, the default value is a single space * BOTH, FROM - these are keywords to specify trimming string characters from both ends of the string * LEADING, FROM - these are keywords to specify trimming string characters from the left end of the string * TRAILING, FROM - these are keywords to specify trimming string characters from the right end of the string Examples: > SELECT trim('SparkSQL '); SparkSQL > SELECT trim('SL', 'SSparkSQLS'); parkSQ > SELECT trim(BOTH 'SL' FROM 'SSparkSQLS'); parkSQ > SELECT trim(LEADING 'SL' FROM 'SSparkSQLS'); parkSQLS > SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS'); SSparkSQ ``` ``` spark-sql> describe function extended ltrim; Function: ltrim Class: org.apache.spark.sql.catalyst.expressions.StringTrimLeft Usage: ltrim(str) - Removes the leading space characters from `str`. ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string Extended Usage: Arguments: * str - a string expression * trimStr - the trim string characters to trim, the default
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140605142 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -146,7 +146,13 @@ case class StreamingSymmetricHashJoinExec( stateWatermarkPredicates = JoinStateWatermarkPredicates(), left, right) } - require(joinType == Inner, s"${getClass.getSimpleName} should not take $joinType as the JoinType") + private lazy val badJoinTypeException = --- End diff -- This should be a def. I dont think exceptions should be created and thrown later. I am not sure whether it will capture the stack trace of when it was created or when it was thrown. Either way its a bad pattern to have. A better approach is to make it into a function `def throwBadJoinTypeException() { throw new ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140608418 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() --- End diff -- current -> currentValue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140615004 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { --- End diff -- This loop can probably be split into two methods `findNextValueToRemove`, and `removeValue` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140616618 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + leftSideJoiner +.removeOldState() +.filterNot { case (key, value) => rightSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(value).withRight(nullRight) } + case RightOuter => +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + rightSideJoiner --- End diff -- nit: split this into two statements with an intermediate variable `removedRowsIter`. and docs on what this does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140614146 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { --- End diff -- maybe rename`valueForIndex -> currentValue` and `index -> nextValueIndex ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140605854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec( } } -/** Remove old buffered state rows using watermarks for state keys and values */ -def removeOldState(): Unit = { +/** + * Builds an iterator over old state key-value pairs, removing them lazily as they're produced. + * + * This iterator is dangerous! It must be consumed fully before any other operations are made --- End diff -- Maybe tone down the language is a little :) Rather than saying "dangerous", uses `@note` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140617927 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + leftSideJoiner +.removeOldState() +.filterNot { case (key, value) => rightSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(value).withRight(nullRight) } + case RightOuter => +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + rightSideJoiner +.removeOldState() +.filterNot { case (key, value) => leftSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(nullLeft).withRight(value) } + case _ => throw badJoinTypeException +} + +val outputIterWithMetrics = outputIter.map { row => + numOutputRows += 1 + row +} + +// Iterator which must be consumed after output completion before committing. +val cleanupIter = joinType match { + case Inner => +leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState() + case LeftOuter => rightSideJoiner.removeOldState() + case RightOuter => leftSideJoiner.removeOldState() + case _ => throw badJoinTypeException +} // Function to remove old state after all the input has been consumed and output generated def onOutputCompletion = { allUpdatesTimeMs += math.max(NANOSECONDS.toMillis(System.nanoTime - updateStartTimeNs), 0) - // Remove old state if needed + // TODO: how to get this for removals as part of outer join? allRemovalsTimeMs += timeTakenMs { -leftSideJoiner.removeOldState() -rightSideJoiner.removeOldState() +cleanupIter.foreach(_ => ()) --- End diff -- dont use foreach. scala's foreach is pretty inefficient. use while loop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140614325 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { + if (index < numValues) { --- End diff -- add comments for each conditional branch to explain this. in retrospect, probably I should have done that too :) but now the code is an order of magnitude more complicated now, so requires more docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140608463 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) --- End diff -- dont use tuples. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140612077 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -329,6 +392,27 @@ class SymmetricHashJoinStateManager( } } +/** Get all the values for key and all indices, in a (value, index) tuple. */ +def getAllWithIndex(key: UnsafeRow, numValues: Long): Iterator[(UnsafeRow, Long)] = { --- End diff -- We can probably convert getAll to this. Does not make sense to have both, especially both are equally efficient if you return Iterator[KeyWithIndexAndValue]. Also, I think the iterator() method can be removed. its effectively not being used (used only in StateManager.iterator, which is not being used really). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140607924 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { --- End diff -- Use UnsafeRowPair for return. Same reason as above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140611178 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -329,6 +392,27 @@ class SymmetricHashJoinStateManager( } } +/** Get all the values for key and all indices, in a (value, index) tuple. */ +def getAllWithIndex(key: UnsafeRow, numValues: Long): Iterator[(UnsafeRow, Long)] = { --- End diff -- use `KeyWithIndexAndValue` for returning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140612202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { --- End diff -- its not really cleanup, maybe "update"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140605588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec( } } -/** Remove old buffered state rows using watermarks for state keys and values */ -def removeOldState(): Unit = { +/** + * Builds an iterator over old state key-value pairs, removing them lazily as they're produced. + * + * This iterator is dangerous! It must be consumed fully before any other operations are made + * against this joiner's join state manager, and in particular commits must not happen while + * this iterator is ongoing. The intermediate states of the iterator leave the state manager in + * an invalid configuration. + * + * We do this unsafe thing to avoid requiring either two passes or full materialization when + * processing the rows for outer join. + */ +def removeOldState(): Iterator[(UnsafeRow, UnsafeRow)] = { --- End diff -- Use `UnsafeRowPair` instead of Tuple2 (i.e. () is shorthand for scala.Tuple2). It reuses the tuple (or equivalent object. Avoid creation of a lot of short term objects, thus reducing GC pressure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140614816 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { + if (index < numValues) { +val current = keyWithIndexToValue.get(currentKey, index) +if (condition(current)) { + valueForIndex = current +} else { + index += 1 +} + } else { +cleanupCurrentKey() + +currentKeyToNumValue = Some(allKeyToNumValues.next()) +numValues = currentKeyToNumValue.get.numValue --- End diff -- numValues -> currentKeyNumValues --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140617841 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + leftSideJoiner +.removeOldState() +.filterNot { case (key, value) => rightSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(value).withRight(nullRight) } + case RightOuter => +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + rightSideJoiner +.removeOldState() +.filterNot { case (key, value) => leftSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(nullLeft).withRight(value) } + case _ => throw badJoinTypeException +} + +val outputIterWithMetrics = outputIter.map { row => + numOutputRows += 1 + row +} + +// Iterator which must be consumed after output completion before committing. +val cleanupIter = joinType match { + case Inner => +leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState() + case LeftOuter => rightSideJoiner.removeOldState() + case RightOuter => leftSideJoiner.removeOldState() + case _ => throw badJoinTypeException +} --- End diff -- This confused me a lot but then I got why you need call removeOldState once again. Can you add the explanation that you have to clean the side that has not been cleaned. Also, this can be moved into the "onOutputCompletion" because that is where this is needed, not before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org