[GitHub] spark issue #20391: [SPARK-23208][SQL] Fix code generation for complex creat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20391 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86623/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20391: [SPARK-23208][SQL] Fix code generation for complex creat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20391 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20391: [SPARK-23208][SQL] Fix code generation for complex creat...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20391 **[Test build #86623 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86623/testReport)** for PR 20391 at commit [`1e8449a`](https://github.com/apache/spark/commit/1e8449a5cbd2d97607883f75d7c3982b547dd214). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20224: [SPARK-23032][SQL] Add a per-query codegenStageId...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20224#discussion_r163769201 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -228,4 +229,35 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } } + + test("codegen stage IDs should be preserved in transformations after CollapseCodegenStages") { +// test case adapted from DataFrameSuite to trigger ReuseExchange +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2") { + val df = spark.range(100) + val join = df.join(df, "id") + val plan = join.queryExecution.executedPlan + assert(!plan.find(p => +p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].codegenStageId == 0).isDefined, +"codegen stage IDs should be preserved through ReuseExchange") + checkAnswer(join, df.toDF) +} + } + + test("including codegen stage ID in generated class name should not regress codegen caching") { +import testImplicits._ + +withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME.key -> "true") { + val bytecodeSizeHisto = CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE + spark.range(3).select('id + 2).collect + val after1 = bytecodeSizeHisto.getCount + spark.range(3).select('id + 2).collect + val after2 = bytecodeSizeHisto.getCount // same query shape as above, deliberately + assert(after1 == after2, "the same query run twice should hit the codegen cache") + + spark.range(5).select('id * 2).collect + val after3 = bytecodeSizeHisto.getCount + assert(after3 >= after2, "a different query can result in codegen cache miss, that's okay") --- End diff -- nit: `a different query can result in codegen cache miss, that's okay` seems a misleading error message for the assert. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19285 **[Test build #86630 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86630/testReport)** for PR 19285 at commit [`40bdcac`](https://github.com/apache/spark/commit/40bdcacfc14b24c913c5979e0b2cf8b90154c543). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163768689 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -232,78 +236,93 @@ private[spark] class MemoryStore( elementsUnrolled += 1 } +val valuesBuilder = if (keepUnrolling) { + Some(valuesHolder.getBuilder()) +} else { + None +} + +// Make sure that we have enough memory to store the block. By this point, it is possible that +// the block's actual memory usage has exceeded the unroll memory by a small amount, so we +// perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size - def transferUnrollToStorage(amount: Long): Unit = { -// Synchronize so that transfer is atomic -memoryManager.synchronized { - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) - assert(success, "transferring unroll memory to storage memory failed") + val size = valuesBuilder.get.preciseSize + if (size > unrollMemoryUsedByThisBlock) { +val amountToRequest = size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } } - // Acquire storage memory if necessary to store this block in memory. - val enoughStorageMemory = { -if (unrollMemoryUsedByThisBlock <= size) { - val acquiredExtra = -memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) - if (acquiredExtra) { -transferUnrollToStorage(unrollMemoryUsedByThisBlock) - } - acquiredExtra -} else { // unrollMemoryUsedByThisBlock > size - // If this task attempt already owns more unroll memory than is necessary to store the - // block, then release the extra memory that will not be used. - val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) - transferUnrollToStorage(size) - true -} +} + +if (keepUnrolling) { --- End diff -- updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19285 **[Test build #86629 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86629/testReport)** for PR 19285 at commit [`9e0759f`](https://github.com/apache/spark/commit/9e0759fb49eb4994099c10c8f8ec3a05637c915b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18931 LGTM, pending jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20224: [SPARK-23032][SQL] Add a per-query codegenStageId to Who...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20224 LGTM, pending jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20224: [SPARK-23032][SQL] Add a per-query codegenStageId...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20224#discussion_r163767104 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -629,6 +629,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME = +buildConf("spark.sql.codegen.wholeStage.useIdInClassName") --- End diff -- Logically it's better to have the `wholeStage` prefix, let's start from here that always adding `wholeStage` prefix to whole stage related config.(keep the existing one unchanged) cc @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19285 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86619/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19285 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19285 **[Test build #86619 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86619/testReport)** for PR 19285 at commit [`ded080d`](https://github.com/apache/spark/commit/ded080d364faf8395f33f2bb7a4eb2d5332f570f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20292: [SPARK-23129][CORE] Make deserializeStream of Dis...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20292 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20292: [SPARK-23129][CORE] Make deserializeStream of DiskMapIte...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20292 thanks, merging to master/2.3! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20372: Improved block merging logic for partitions
Github user ash211 commented on the issue: https://github.com/apache/spark/pull/20372 Tagging folks who have touched this code recently: @vgankidi @ericl @davies This seems to provide a more compact packing in every scenario, which should improve execution times. One risk is that individual partitions are no longer always contiguous ranges of files in order, but rather sometimes they have a gap. In the test this is the `(file1, file6)` partition. If something depends on this past behavior it could now break, though I don't think anything should be requiring this partition ordering. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20390 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVectorizer
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20367 **[Test build #4077 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4077/testReport)** for PR 20367 at commit [`daa0870`](https://github.com/apache/spark/commit/daa08705b2cb8e097c037171b809f0cbfab3f52c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20390 **[Test build #86626 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86626/testReport)** for PR 20390 at commit [`92ee53a`](https://github.com/apache/spark/commit/92ee53a8af720cb107dd0da7e1ea6eaaf32f0c06). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20390 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86626/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18931 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86622/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18931 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18931 **[Test build #86622 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86622/testReport)** for PR 18931 at commit [`c859d53`](https://github.com/apache/spark/commit/c859d53dd909cce87056e3fee9fe42b2d4d5acdb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18931 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18931 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86621/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18931 **[Test build #86621 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86621/testReport)** for PR 18931 at commit [`2fdf6e7`](https://github.com/apache/spark/commit/2fdf6e707bb8e234e3ef635eb0d421ec93aafd79). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/13599 **[Test build #86628 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86628/testReport)** for PR 13599 at commit [`0a5eb38`](https://github.com/apache/spark/commit/0a5eb388e6e7d4714153d632139f8d1dc5567e14). * This patch **fails to build**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean)` * ` class DriverEndpoint(override val rpcEnv: RpcEnv)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19285 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86620/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86628/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19285 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18906 BTW, what do we get if it returns `None` but it's non-nullable, NEP? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19285 **[Test build #86620 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86620/testReport)** for PR 19285 at commit [`b41f1bb`](https://github.com/apache/spark/commit/b41f1bbb5e774205b321554af1376c4683582a0e). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20292: [SPARK-23129][CORE] Make deserializeStream of DiskMapIte...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/20292 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/223/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20292: [SPARK-23129][CORE] Make deserializeStream of DiskMapIte...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20292 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18906: [SPARK-21692][PYSPARK][SQL] Add nullability suppo...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18906#discussion_r163759284 --- Diff: python/pyspark/sql/functions.py --- @@ -2105,6 +2105,14 @@ def udf(f=None, returnType=StringType()): >>> import random >>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() +.. note:: The user-defined functions are considered to be able to return null values by default. +If your function is not nullable, call `asNonNullable` on the user defined function. +E.g.: + +>>> from pyspark.sql.types import StringType +>>> import getpass +>>> getuser_udf = udf(lambda: getpass.getuser(), StringType()).asNonNullable() --- End diff -- Default is string. I think we can omit `StingType`: ```python getuser_udf = udf(lambda: getpass.getuser(), "string").asNonNullable() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18906: [SPARK-21692][PYSPARK][SQL] Add nullability suppo...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18906#discussion_r163757006 --- Diff: python/pyspark/sql/functions.py --- @@ -2264,6 +2272,16 @@ def pandas_udf(f=None, returnType=None, functionType=None): ... return pd.Series(np.random.randn(len(v)) >>> random = random.asNondeterministic() # doctest: +SKIP +.. note:: The user-defined functions are considered to be able to return null values by default. +If your function is not nullable, call `asNonNullable` on the user defined function. +E.g.: + +>>> @pandas_udf('string', PandasUDFType.SCALAR) # doctest: +SKIP +... def get_user(v): +... import getpass as gp +... return gp.getuser() --- End diff -- I don't think this is quite right example. Correct and better one should look like this: ```python @pandas_udf("string") def foo(s): import getpass import pandas return pandas.Series(getpass.getuser()).repeat(s.size) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/13599 **[Test build #86628 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86628/testReport)** for PR 13599 at commit [`0a5eb38`](https://github.com/apache/spark/commit/0a5eb388e6e7d4714153d632139f8d1dc5567e14). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20384: [SPARK-23195] [SQL] Keep the Hint of Cached Data
Github user gatorsmile closed the pull request at: https://github.com/apache/spark/pull/20384 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20292: [SPARK-23129][CORE] Make deserializeStream of DiskMapIte...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20292 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86618/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20292: [SPARK-23129][CORE] Make deserializeStream of DiskMapIte...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20292 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20292: [SPARK-23129][CORE] Make deserializeStream of DiskMapIte...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20292 **[Test build #86618 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86618/testReport)** for PR 20292 at commit [`a443531`](https://github.com/apache/spark/commit/a443531eeaa2a20a79a640cb6ecca9737164cfd5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle rea...
Github user gczsjdy closed the pull request at: https://github.com/apache/spark/pull/19862 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19862: [SPARK-22671][SQL] Make SortMergeJoin shuffle read less ...
Github user gczsjdy commented on the issue: https://github.com/apache/spark/pull/19862 @cloud-fan Ok, thanks for your time, I will close this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20390 **[Test build #86626 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86626/testReport)** for PR 20390 at commit [`92ee53a`](https://github.com/apache/spark/commit/92ee53a8af720cb107dd0da7e1ea6eaaf32f0c06). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20224: [SPARK-23032][SQL] Add a per-query codegenStageId to Who...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20224 **[Test build #86627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86627/testReport)** for PR 20224 at commit [`a11232e`](https://github.com/apache/spark/commit/a11232e162c50a1b9312410debb9fb7c4766f9a2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20224: [SPARK-23032][SQL] Add a per-query codegenStageId to Who...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20224 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/222/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20390 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20224: [SPARK-23032][SQL] Add a per-query codegenStageId to Who...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20224 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20390 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/221/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/20390#discussion_r163759921 --- Diff: python/pyspark/sql/tests.py --- @@ -2855,6 +2855,10 @@ def test_create_dataframe_from_old_pandas(self): with self.assertRaisesRegexp(ImportError, 'Pandas >= .* must be installed'): self.spark.createDataFrame(pdf) +def test_colRegex(self): +df = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)]) +self.assertEqual(df.select(df.colRegex("`(_1)?+.+`")).collect(), df.select("_2").collect()) --- End diff -- @HyukjinKwon Thanks! I will make the changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20224: [SPARK-23032][SQL] Add a per-query codegenStageId...
Github user rednaxelafx commented on a diff in the pull request: https://github.com/apache/spark/pull/20224#discussion_r163757952 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -325,6 +326,28 @@ object WholeStageCodegenExec { } } +object WholeStageCodegenId { + private val codegenStageCounter = ThreadLocal.withInitial(new Supplier[Integer] { +override def get() = 0 // TODO: change to Scala lambda syntax when upgraded to Scala 2.12+ --- End diff -- With the updated PR that uses the secondary constructor in `WholeStageCodegenExec`, yes you're making a good point. All the places that create temporary `WholeStageCodegenExec` objects are explicitly passing in `0` as the codegen stage ID now, so we can indeed simplify the counter logic here. Will address in the next update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20224: [SPARK-23032][SQL] Add a per-query codegenStageId...
Github user rednaxelafx commented on a diff in the pull request: https://github.com/apache/spark/pull/20224#discussion_r163757687 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -629,6 +629,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME = +buildConf("spark.sql.codegen.wholeStage.useIdInClassName") --- End diff -- I'm open to suggestions for the config option name. Do you have any concrete suggestions? Looks like you're suggesting `spark.sql.codegen.useIdInClassName`, right? I chose the current name (prefix) for two reasons: 1. the config option right before mine is named `spark.sql.codegen.wholeStage`, and I just used it as a prefix 2. this option only affects whole-stage codegen and not other (expression/predicate/ordering/encoder) codegens. But you're making a good point that all the other whole-stage codegen config options (the ones following this one) only use `spark.sql.codegen` as the prefix. So if you'd confirm that I understood your suggestion correctly, I'll update the PR to address it. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVectorizer
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20367 **[Test build #4077 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4077/testReport)** for PR 20367 at commit [`daa0870`](https://github.com/apache/spark/commit/daa08705b2cb8e097c037171b809f0cbfab3f52c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20389: [SPARK-23205][ML] Update ImageSchema.readImages t...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20389#discussion_r163756853 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -169,8 +169,7 @@ object ImageSchema { var offset = 0 for (h <- 0 until height) { for (w <- 0 until width) { -val color = new Color(img.getRGB(w, h)) - +val color = new Color(img.getRGB(w, h), nChannels == 4) decoded(offset) = color.getBlue.toByte decoded(offset + 1) = color.getGreen.toByte decoded(offset + 2) = color.getRed.toByte --- End diff -- At line 177, ```scala if (nChannels == 4) { decoded(offset + 3) = color.getAlpha.toByte } ``` We can directly use `hasAlpha` too, instead of indirectly comparing `nChannels`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20389: [SPARK-23205][ML] Update ImageSchema.readImages t...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20389#discussion_r163756581 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -169,8 +169,7 @@ object ImageSchema { var offset = 0 for (h <- 0 until height) { for (w <- 0 until width) { -val color = new Color(img.getRGB(w, h)) - +val color = new Color(img.getRGB(w, h), nChannels == 4) --- End diff -- Why don't directly use `hasAlpha`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20390#discussion_r163755876 --- Diff: python/pyspark/sql/tests.py --- @@ -2855,6 +2855,10 @@ def test_create_dataframe_from_old_pandas(self): with self.assertRaisesRegexp(ImportError, 'Pandas >= .* must be installed'): self.spark.createDataFrame(pdf) +def test_colRegex(self): +df = self.spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)]) +self.assertEqual(df.select(df.colRegex("`(_1)?+.+`")).collect(), df.select("_2").collect()) --- End diff -- I think this is actually being tested in doctest. Seems we can remove out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20390#discussion_r163755064 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1881,6 +1881,28 @@ def toDF(self, *cols): jdf = self._jdf.toDF(self._jseq(cols)) return DataFrame(jdf, self.sql_ctx) +@since(2.4) --- End diff -- Could we put this API between `def columns(self):` and `def alias(self, alias):`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20390: [SPARK-23081][PYTHON]Add colRegex API to PySpark
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20390#discussion_r163756176 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1881,6 +1881,28 @@ def toDF(self, *cols): jdf = self._jdf.toDF(self._jseq(cols)) return DataFrame(jdf, self.sql_ctx) +@since(2.4) +def colRegex(self, colName): +""" +Selects column based on the column name specified as a regex and return it +as :class:`Column`. + +:param colName: string, column name specified as a regex. + +>>> df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)]) +>>> df.select(df.colRegex("`(_1)?+.+`")).show() ++---+ +| _2| ++---+ +| 1| +| 2| +| 3| ++---+ +""" +assert isinstance(colName, basestring), "colName should be a string" --- End diff -- I think `TypeError` with an if could be more correct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20375: [SPARK-23199][SQL]improved Removes repetition from group...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20375 **[Test build #86625 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86625/testReport)** for PR 20375 at commit [`5050e50`](https://github.com/apache/spark/commit/5050e50aedb25463ff690b54356211dd859af690). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20388: [SPARK-23020][core] Fix race in SparkAppHandle cleanup, ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20388 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86616/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20388: [SPARK-23020][core] Fix race in SparkAppHandle cleanup, ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20388 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163755819 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { --- End diff -- I see, thanks for the clarify. Let me change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20388: [SPARK-23020][core] Fix race in SparkAppHandle cleanup, ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20388 **[Test build #86616 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86616/testReport)** for PR 20388 at commit [`fb14eaa`](https://github.com/apache/spark/commit/fb14eaa918509f124a2a75155f8199b28de9a183). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class ServerConnection extends LauncherConnection ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163755636 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { --- End diff -- The idea is that the existing TextSocketSourceProvider will have the MicroBatchReadSupport implementation here, in addition to the StreamSourceProvider implementation it already has. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20375: [SPARK-23199][SQL]improved Removes repetition from group...
Github user heary-cao commented on the issue: https://github.com/apache/spark/pull/20375 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20377: [SPARK-17088] [FOLLOW-UP] Fix 'sharesHadoopClasses' opti...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20377 The original test covers the original scenario. I think the one pointed by @vanzin is another issue. However, I do not have time to try it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20384: [SPARK-23195] [SQL] Keep the Hint of Cached Data
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20384#discussion_r163755332 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -110,15 +110,39 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } test("broadcast hint is retained after using the cached data") { -withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") - val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") - df2.cache() - val df3 = df1.join(broadcast(df2), Seq("key"), "inner") - val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect { -case b: BroadcastHashJoinExec => b - }.size - assert(numBroadCastHashJoin === 1) +try { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { +val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") +val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") +df2.cache() +val df3 = df1.join(broadcast(df2), Seq("key"), "inner") +val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect { + case b: BroadcastHashJoinExec => b +}.size +assert(numBroadCastHashJoin === 1) + } +} finally { + spark.catalog.clearCache() --- End diff -- Yeah. That should be a separate bug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20226: [SPARK-23034][SQL] Override `nodeName` for all *ScanExec...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20226 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20226: [SPARK-23034][SQL] Override `nodeName` for all *ScanExec...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20226 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/220/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18931 **[Test build #86624 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86624/testReport)** for PR 18931 at commit [`11946e7`](https://github.com/apache/spark/commit/11946e7a62928304560c0602d71b3064789086d6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163753088 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { --- End diff -- @jose-torres , you mean that instead of creating a new V2 socket source, modifying current V1 socket source to make it work with V2, am I understanding correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18931 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/219/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20392: Update ApplicationMaster.scala
Github user Sangrho closed the pull request at: https://github.com/apache/spark/pull/20392 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18931 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163752999 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is enabled. +// 2. `inputVars` are all materialized. That is guaranteed to be true if the parent plan uses +//all variables in output (see `requireAllOutput`). +// 3. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && requireAllOutput && + ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars, row) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode], + row: String): String = { +val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars, row) +val rowVar = prepareRowVar(ctx, row, inputVarsInFunc) + +val doConsume = ctx.freshName("doConsume") +ctx.currentVars = inputVarsInFunc +ctx.INPUT_ROW = null + +val doConsumeFuncName = ctx.addNewFunction(doConsume, + s""" + | private void $doConsume(${params.mkString(", ")}) throws java.io.IOException { + | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} + | } + """.stripMargin) + +s""" + | $doConsumeFuncName(${args.mkString(", ")}); """.stripMargin } + /** + * Returns arguments for calling method and method definition parameters of the consume function. + * And also returns the list of `ExprCode` for the parameters. + */ + private def constructConsumeParameters( + ctx: CodegenContext, + attributes: Seq[Attribute], + variables: Seq[ExprCode], + row: String): (Seq[String], Seq[String], Seq[ExprCode]) = { +val arguments = mutable.ArrayBuffer[String]() +val parameters = mutable.ArrayBuffer[String]() +val paramVars = mutable.ArrayBuffer[ExprCode]() + +if (row != null) { + arguments += row --- End diff -- Added an extra unit for `row` if needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20392: Update ApplicationMaster.scala
Github user srowen commented on the issue: https://github.com/apache/spark/pull/20392 Close this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20369#discussion_r163751348 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java --- @@ -48,7 +48,7 @@ * @param options the options for the returned data source writer, which is an immutable *case-insensitive string-to-string map. */ -Optional createContinuousWriter( +Optional createStreamWriter( --- End diff -- do we still need to return `Optional`? In which case an implementation should return `None`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20369#discussion_r163751286 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -281,11 +281,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { trigger = trigger) } else { val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) - val sink = (ds.newInstance(), trigger) match { -case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w -case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException( -s"Data source $source does not support continuous writing") -case (w: MicroBatchWriteSupport, _) => w + val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") --- End diff -- ok so this is only useful for built-in stream sources, as the v1 source API is not public, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20369#discussion_r163751198 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java --- @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming; - -import java.util.Optional; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.execution.streaming.BaseStreamingSink; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.DataSourceV2Options; -import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; -import org.apache.spark.sql.streaming.OutputMode; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability and save the data from a microbatch to the data source. - */ -@InterfaceStability.Evolving -public interface MicroBatchWriteSupport extends BaseStreamingSink { - - /** - * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data - * sources can return None if there is no writing needed to be done. - * - * @param queryId A unique string for the writing query. It's possible that there are many writing - *queries running at the same time, and the returned {@link DataSourceV2Writer} - *can use this id to distinguish itself from others. - * @param epochId The unique numeric ID of the batch within this writing query. This is an - *incrementing counter representing a consistent set of data; the same batch may - *be started multiple times in failure recovery scenarios, but it will always - *contain the same records. - * @param schema the schema of the data to be written. - * @param mode the output mode which determines what successive batch output means to this - * sink, please refer to {@link OutputMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - *case-insensitive string-to-string map. - */ - Optional createMicroBatchWriter( --- End diff -- agreed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20369: [SPARK-23196] Unify continuous and microbatch V2 ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20369#discussion_r163751107 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -62,7 +62,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan) try { val runTask = writer match { -case w: ContinuousWriter => +case w: StreamWriter => --- End diff -- I don't have a better idea, but at least we should add some comments here to explain this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20377: [SPARK-17088] [FOLLOW-UP] Fix 'sharesHadoopClasses' opti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20377 I'm a little confused, I think [this test](https://github.com/apache/spark/pull/20169/files#diff-0456ca985f0d885d5b72654e10be77ccR204) should help us detect the wrong fix, but this PR passed all tests. Does it indicate that the test actually can't expose the original bug? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20384: [SPARK-23195] [SQL] Keep the Hint of Cached Data
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20384#discussion_r163750722 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -110,15 +110,39 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } test("broadcast hint is retained after using the cached data") { -withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") - val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") - df2.cache() - val df3 = df1.join(broadcast(df2), Seq("key"), "inner") - val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect { -case b: BroadcastHashJoinExec => b - }.size - assert(numBroadCastHashJoin === 1) +try { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { +val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") +val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") +df2.cache() +val df3 = df1.join(broadcast(df2), Seq("key"), "inner") +val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect { + case b: BroadcastHashJoinExec => b +}.size +assert(numBroadCastHashJoin === 1) + } +} finally { + spark.catalog.clearCache() --- End diff -- do you mean we can remove the cache cleaning here after you fix that bug? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20359: [SPARK-23186][SQL] Initialize DriverManager first...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20359#discussion_r163750551 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala --- @@ -32,6 +32,9 @@ import org.apache.spark.util.Utils */ object DriverRegistry extends Logging { + // Initialize DriverManager first to prevent potential deadlocks between DriverManager and Driver + DriverManager.getDrivers --- End diff -- if it's too hard to write a UT, I think a manual test is also fie. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20359: [SPARK-23186][SQL] Initialize DriverManager first...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20359#discussion_r163750452 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala --- @@ -32,6 +32,9 @@ import org.apache.spark.util.Utils */ object DriverRegistry extends Logging { + // Initialize DriverManager first to prevent potential deadlocks between DriverManager and Driver --- End diff -- we can copy something from the storm PR: https://github.com/apache/storm/pull/2134/files --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20359: [SPARK-23186][SQL] Initialize DriverManager first...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20359#discussion_r163750346 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala --- @@ -32,6 +32,9 @@ import org.apache.spark.util.Utils */ object DriverRegistry extends Logging { + // Initialize DriverManager first to prevent potential deadlocks between DriverManager and Driver --- End diff -- We need to say more about why this can avoid deadlock. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20392: Update ApplicationMaster.scala
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20392 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18931 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86615/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18931 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20367: [SPARK-23166][ML] Add maxDF Parameter to CountVectorizer
Github user ymazari commented on the issue: https://github.com/apache/spark/pull/20367 @srowen It seems that this PR needs Admin approval. Could you please help getting it to the next step? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163750079 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +641,87 @@ private[spark] class MemoryStore( } } +private trait ValuesBuilder[T] { + def preciseSize: Long + def build(): MemoryEntry[T] +} + +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(): Long + def getBuilder(): ValuesBuilder[T] --- End diff -- add a comment to say that, after `getBuilder` is called, this `ValuesHolder` becomes invalid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20392: Update ApplicationMaster.scala
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20392 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163749987 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -702,6 +641,87 @@ private[spark] class MemoryStore( } } +private trait ValuesBuilder[T] { + def preciseSize: Long + def build(): MemoryEntry[T] +} + +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def estimatedSize(): Long + def getBuilder(): ValuesBuilder[T] +} + +/** + * A holder for storing the deserialized values. + */ +private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] { + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[T]()(classTag) + var arrayValues: Array[T] = null + + override def storeValue(value: T): Unit = { +vector += value + } + + override def estimatedSize(): Long = { +vector.estimateSize() + } + + override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] { +// We successfully unrolled the entirety of this block +arrayValues = vector.toArray +vector = null + +override val preciseSize: Long = SizeEstimator.estimate(arrayValues) + +override def build(): MemoryEntry[T] = + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) + } +} + +/** + * A holder for storing the serialized values. + */ +private class SerializedValuesHolder[T]( +blockId: BlockId, +chunkSize: Int, +classTag: ClassTag[T], +memoryMode: MemoryMode, +serializerManager: SerializerManager) extends ValuesHolder[T] { + val allocator = memoryMode match { +case MemoryMode.ON_HEAP => ByteBuffer.allocate _ +case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + + val redirectableStream = new RedirectableOutputStream + val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) + redirectableStream.setOutputStream(bbos) + val serializationStream: SerializationStream = { +val autoPick = !blockId.isInstanceOf[StreamBlockId] +val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() +ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) + } + + override def storeValue(value: T): Unit = { +serializationStream.writeObject(value)(classTag) + } + + override def estimatedSize(): Long = { +bbos.size + } + + override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] { +// We successfully unrolled the entirety of this block +serializationStream.close() + +override val preciseSize: Long = bbos.size --- End diff -- this can be a `def`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18931: [SPARK-21717][SQL] Decouple consume functions of physica...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18931 **[Test build #86615 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86615/testReport)** for PR 18931 at commit [`0c4173e`](https://github.com/apache/spark/commit/0c4173e5fffaa2dead09f184b301355a40e6118f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20392: Update ApplicationMaster.scala
GitHub user Sangrho opened a pull request: https://github.com/apache/spark/pull/20392 Update ApplicationMaster.scala I have one question. I think when maxNumExecutorFailures is calculated, MAX_EXECUTOR_FAILURES is already defined by specific by spark document (as numExecutors * 2, with minimum of 3) So the annotation added by me in the code is not valid. Give me the answer please. Thank you ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Sangrho/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20392.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20392 commit 2eb87e032582f3b398997f3877d6f27ec2b1653e Author: Josh LEEDate: 2018-01-25T04:53:41Z Update ApplicationMaster.scala I have one question. I think when maxNumExecutorFailures is calculated, MAX_EXECUTOR_FAILURES is already defined by specific by spark document (as numExecutors * 2, with minimum of 3) So the annotation added by me in the code is not valid. Give me the answer please. Thank you --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r163749065 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -232,78 +236,93 @@ private[spark] class MemoryStore( elementsUnrolled += 1 } +val valuesBuilder = if (keepUnrolling) { + Some(valuesHolder.getBuilder()) +} else { + None +} + +// Make sure that we have enough memory to store the block. By this point, it is possible that +// the block's actual memory usage has exceeded the unroll memory by a small amount, so we +// perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = -new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) - val size = entry.size - def transferUnrollToStorage(amount: Long): Unit = { -// Synchronize so that transfer is atomic -memoryManager.synchronized { - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) - assert(success, "transferring unroll memory to storage memory failed") + val size = valuesBuilder.get.preciseSize + if (size > unrollMemoryUsedByThisBlock) { +val amountToRequest = size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } } - // Acquire storage memory if necessary to store this block in memory. - val enoughStorageMemory = { -if (unrollMemoryUsedByThisBlock <= size) { - val acquiredExtra = -memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) - if (acquiredExtra) { -transferUnrollToStorage(unrollMemoryUsedByThisBlock) - } - acquiredExtra -} else { // unrollMemoryUsedByThisBlock > size - // If this task attempt already owns more unroll memory than is necessary to store the - // block, then release the extra memory that will not be used. - val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) - transferUnrollToStorage(size) - true -} +} + +if (keepUnrolling) { --- End diff -- a little improvement ``` if (keepUnrolling) { val builder = valuesHolder.getBuilder() ... if (keepUnrolling) { val entry = builder.build() ... Right(entry.size) } else { ... logUnrollFailureMessage(blockId, builder.preciseSize) Left(unrollMemoryUsedByThisBlock) } } else { ... logUnrollFailureMessage(blockId, valueHolder.estimatedSize) Left(unrollMemoryUsedByThisBlock) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20224: [SPARK-23032][SQL] Add a per-query codegenStageId to Who...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20224 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20224: [SPARK-23032][SQL] Add a per-query codegenStageId...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20224#discussion_r163748166 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -325,6 +326,28 @@ object WholeStageCodegenExec { } } +object WholeStageCodegenId { + private val codegenStageCounter = ThreadLocal.withInitial(new Supplier[Integer] { +override def get() = 0 // TODO: change to Scala lambda syntax when upgraded to Scala 2.12+ --- End diff -- shall we just use 1 as initial value and add a comment to say that 0 is preserved for temporary WholeStageCodegenExec objects? Then we only need a `reset` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org