[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19160 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17819 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82016/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17819 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17819 **[Test build #82016 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82016/testReport)** for PR 17819 at commit [`f70fc2a`](https://github.com/apache/spark/commit/f70fc2a956082a83859313f663649a9b17dbbf36). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...
Github user logannc commented on the issue: https://github.com/apache/spark/pull/18945 Hm. Where would I add tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory usage to...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19160 Thanks all for your review, let me merge to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user logannc commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r140153330 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1761,12 +1761,37 @@ def toPandas(self): raise ImportError("%s\n%s" % (e.message, msg)) else: dtype = {} +columns_with_null_int = {} +def null_handler(rows, columns_with_null_int): +for row in rows: +row = row.asDict() +for column in columns_with_null_int: +val = row[column] +dt = dtype[column] +if val is not None: +if abs(val) > 16777216: # Max value before np.float32 loses precision. +val = np.float64(val) +if np.float64 != dt: +dt = np.float64 +dtype[column] = np.float64 +else: +val = np.float32(val) +if dt not in (np.float32, np.float64): +dt = np.float32 +dtype[column] = np.float32 +row[column] = val +row = Row(**row) +yield row +row_handler = lambda x,y: x for field in self.schema: pandas_type = _to_corrected_pandas_type(field.dataType) +if pandas_type in (np.int8, np.int16, np.int32) and field.nullable: +columns_with_null_int.add(field.name) --- End diff -- Fixed... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19257: [SPARK-22042] [SQL] ReorderJoinPredicates can break when...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19257 Maybe we need to rethink about the planning phase for adding shuffles. How about we add a placeholder for shuffle node and then replace the placeholder with actual shuffle node in `EnsureRequirements`? Then we can make sure the plan tree is always resolved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19290: [WIP][SPARK-22063][R] Upgrades lintr to latest commit sh...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/19290 I think this is great to have, thanks for solving the mystery. - 5 min: this is mildly concerning, is it possible this is caused by new checks in lintr? perhaps we could exclude them or something? - lintr version: I see what you say about 1.0.1. According this commit list https://github.com/jimhester/lintr/commits/master the tag for 1.0.1 vs the latest (the git tag you are using) are different only by some insignificant changes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user bikassaha commented on a diff in the pull request: https://github.com/apache/spark/pull/19294#discussion_r140151855 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -130,17 +130,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") -val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) -for ((src, dst) <- filesToMove) { - fs.rename(new Path(src), new Path(dst)) +if (addedAbsPathFiles != null && addedAbsPathFiles.nonEmpty) { --- End diff -- Please consider using a common method instead of duplicating the code in the 2 if statements. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19290: [WIP][SPARK-22063][R] Upgrades lintr to latest co...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19290#discussion_r140151179 --- Diff: dev/lint-r.R --- @@ -28,6 +28,7 @@ if (! library(SparkR, lib.loc = LOCAL_LIB_LOC, logical.return = TRUE)) { # NOTE: The CRAN's version is too old to adapt to our rules. if ("lintr" %in% row.names(installed.packages()) == FALSE) { devtools::install_github("jimhester/lintr@a769c0b") + # devtools::install_github("jimhester/lintr@5431140") --- End diff -- because of `if ("lintr" %in% row.names(installed.packages()) == FALSE) {` ? it might be the case the Jenkins boxes already have a version installed and this won't update it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19290: [WIP][SPARK-22063][R] Upgrades lintr to latest co...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19290#discussion_r140150499 --- Diff: R/pkg/R/DataFrame.R --- @@ -2649,15 +2651,15 @@ setMethod("merge", #' @return list of columns #' #' @note generateAliasesForIntersectedCols since 1.6.0 -generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) { +generateAliasesForIntersectedCols <- function(x, intersectedColNames, suffix) { # nolint --- End diff -- wait... this isn't public though; it's not in https://github.com/apache/spark/blob/master/R/pkg/NAMESPACE looks like this shouldn't be in the doc (`@noRd` or `#` instead of `#'`) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19290: [WIP][SPARK-22063][R] Upgrades lintr to latest co...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19290#discussion_r140150229 --- Diff: R/pkg/R/DataFrame.R --- @@ -2594,12 +2596,12 @@ setMethod("merge", } else { # if by or both by.x and by.y have length 0, use Cartesian Product joinRes <- crossJoin(x, y) - return (joinRes) + return(joinRes) } # sets alias for making colnames unique in dataframes 'x' and 'y' -colsX <- generateAliasesForIntersectedCols(x, by, suffixes[1]) -colsY <- generateAliasesForIntersectedCols(y, by, suffixes[2]) +colsX <- generateAliasesForIntersectedCols(x, by, suffixes[1]) # nolint +colsY <- generateAliasesForIntersectedCols(y, by, suffixes[2]) # nolint --- End diff -- what's the problem here? name too long I think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19290: [WIP][SPARK-22063][R] Upgrades lintr to latest co...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19290#discussion_r140150600 --- Diff: R/pkg/R/context.R --- @@ -329,7 +329,7 @@ spark.addFile <- function(path, recursive = FALSE) { #' spark.getSparkFilesRootDirectory() #'} #' @note spark.getSparkFilesRootDirectory since 2.1.0 -spark.getSparkFilesRootDirectory <- function() { +spark.getSparkFilesRootDirectory <- function() { # nolint --- End diff -- this one is an API... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r140149449 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -354,63 +401,30 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold +def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() +def estimateSize(precise: Boolean): Long = { + if (precise) { +serializationStream.flush() --- End diff -- OK, I'll do it tomorrow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/18659 @BryanCutler Hmm, I'm not exactly sure the reason why it doesn't work (or mine works) but we can use `fillna(0)` before casting like: ``` pa.Array.from_pandas(s.fillna(0).astype(t.to_pandas_dtype(), copy=False), mask=s.isnull(), type=t) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user logannc commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r140148777 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1761,12 +1761,37 @@ def toPandas(self): raise ImportError("%s\n%s" % (e.message, msg)) else: dtype = {} +columns_with_null_int = {} +def null_handler(rows, columns_with_null_int): +for row in rows: +row = row.asDict() +for column in columns_with_null_int: +val = row[column] +dt = dtype[column] +if val is not None: +if abs(val) > 16777216: # Max value before np.float32 loses precision. +val = np.float64(val) +if np.float64 != dt: --- End diff -- No, not strictly necessary, but also hardly harmful and it may future proof a bit...? Anyway, it can be removed if you think it should. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user logannc commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r140148164 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1761,12 +1761,37 @@ def toPandas(self): raise ImportError("%s\n%s" % (e.message, msg)) else: dtype = {} +columns_with_null_int = {} +def null_handler(rows, columns_with_null_int): +for row in rows: +row = row.asDict() +for column in columns_with_null_int: +val = row[column] +dt = dtype[column] +if val is not None: +if abs(val) > 16777216: # Max value before np.float32 loses precision. +val = np.float64(val) +if np.float64 != dt: +dt = np.float64 +dtype[column] = np.float64 +else: +val = np.float32(val) +if dt not in (np.float32, np.float64): +dt = np.float32 +dtype[column] = np.float32 +row[column] = val +row = Row(**row) +yield row +row_handler = lambda x,y: x for field in self.schema: pandas_type = _to_corrected_pandas_type(field.dataType) +if pandas_type in (np.int8, np.int16, np.int32) and field.nullable: +columns_with_null_int.add(field.name) --- End diff -- Ack, I noticed I did that then forgot to change it. On it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user logannc commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r140147933 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1761,12 +1761,37 @@ def toPandas(self): raise ImportError("%s\n%s" % (e.message, msg)) else: dtype = {} +columns_with_null_int = {} +def null_handler(rows, columns_with_null_int): +for row in rows: +row = row.asDict() +for column in columns_with_null_int: +val = row[column] +dt = dtype[column] +if val is not None: --- End diff -- They are handled by Pandas already, so I am just letting them pass through. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18994: [SPARK-21784][SQL] Adds support for defining information...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18994 **[Test build #82019 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82019/testReport)** for PR 18994 at commit [`ea39601`](https://github.com/apache/spark/commit/ea39601829a8a1e4c5642e68ef0fab68310bdae1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19294 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82015/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19294 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19294 **[Test build #82015 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82015/testReport)** for PR 19294 at commit [`621c337`](https://github.com/apache/spark/commit/621c337f16725fe9be83ae974efd2eaa83eb8762). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19298: [SPARK-22076][SQL][followup] Expand.projections s...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19298 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19298: [SPARK-22076][SQL][followup] Expand.projections should n...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19298 LGTM Merging to master/2.2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/18945 @HyukjinKwon I can take over this if @logannc can't find time to continue it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/18945 We also need a proper test for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r140144263 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1761,12 +1761,37 @@ def toPandas(self): raise ImportError("%s\n%s" % (e.message, msg)) else: dtype = {} +columns_with_null_int = {} +def null_handler(rows, columns_with_null_int): +for row in rows: +row = row.asDict() +for column in columns_with_null_int: +val = row[column] +dt = dtype[column] +if val is not None: --- End diff -- Don't we want to change data type for `None` values? I don't see you do it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r140143875 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1761,12 +1761,37 @@ def toPandas(self): raise ImportError("%s\n%s" % (e.message, msg)) else: dtype = {} +columns_with_null_int = {} +def null_handler(rows, columns_with_null_int): +for row in rows: +row = row.asDict() +for column in columns_with_null_int: +val = row[column] +dt = dtype[column] +if val is not None: +if abs(val) > 16777216: # Max value before np.float32 loses precision. +val = np.float64(val) +if np.float64 != dt: +dt = np.float64 +dtype[column] = np.float64 +else: +val = np.float32(val) +if dt not in (np.float32, np.float64): +dt = np.float32 +dtype[column] = np.float32 +row[column] = val +row = Row(**row) +yield row +row_handler = lambda x,y: x for field in self.schema: pandas_type = _to_corrected_pandas_type(field.dataType) +if pandas_type in (np.int8, np.int16, np.int32) and field.nullable: +columns_with_null_int.add(field.name) --- End diff -- >>> columns_with_null_int = {} >>> columns_with_null_int.add("test") Traceback (most recent call last): File "", line 1, in AttributeError: 'dict' object has no attribute 'add' Am I missing something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19287: [SPARK-22074][Core] Task killed by other attempt task sh...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19287 **[Test build #82018 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82018/testReport)** for PR 19287 at commit [`81ac4dc`](https://github.com/apache/spark/commit/81ac4dc75f91c888a9fefa805915f9420d71b761). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19287#discussion_r140143293 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala --- @@ -66,6 +66,12 @@ class TaskInfo( */ var finishTime: Long = 0 + /** + * Set this tag when this task killed by other attempt. This kind of task should not resubmit + * while executor lost. + */ --- End diff -- Thanks for reviewing, rewrite this comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19300: [SPARK-22082][SparkR]Spelling mistake: "choosen" in API ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19300 I am fixing some codes around here - https://github.com/apache/spark/pull/19290/files#diff-d9f92e07db6424e2527a7f9d7caa9013R328. If this one is only the one, let me fold this into mine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19219: [SPARK-21993][SQL][WIP] Close sessionState when finish
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19219 **[Test build #82017 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82017/testReport)** for PR 19219 at commit [`e421113`](https://github.com/apache/spark/commit/e4211137bdc72c3e94d7bce2944d108e5cb70b55). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user a10y commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r140142458 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1761,12 +1761,37 @@ def toPandas(self): raise ImportError("%s\n%s" % (e.message, msg)) else: dtype = {} +columns_with_null_int = {} +def null_handler(rows, columns_with_null_int): +for row in rows: +row = row.asDict() +for column in columns_with_null_int: +val = row[column] +dt = dtype[column] +if val is not None: +if abs(val) > 16777216: # Max value before np.float32 loses precision. +val = np.float64(val) +if np.float64 != dt: --- End diff -- (Also thanks for solving the precision issue)! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19106: [SPARK-21770][ML] ProbabilisticClassificationMode...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19106#discussion_r140142353 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala --- @@ -230,21 +230,19 @@ private[ml] object ProbabilisticClassificationModel { * Normalize a vector of raw predictions to be a multinomial probability vector, in place. * * The input raw predictions should be nonnegative. - * The output vector sums to 1, unless the input vector is all-0 (in which case the output is - * all-0 too). + * The output vector sums to 1, when the input vector is all-0, exception will be thrown. --- End diff -- This isn't grammatical. You might just move the new note to a proper throws tag in the scaladoc here anyway --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19300: [SPARK-22082][SparkR]Spelling mistake: "choosen" in API ...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19300 Please review and fix typos in a whole batch of code, or don't bother and close this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18945: Add option to convert nullable int columns to flo...
Github user a10y commented on a diff in the pull request: https://github.com/apache/spark/pull/18945#discussion_r140141889 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1761,12 +1761,37 @@ def toPandas(self): raise ImportError("%s\n%s" % (e.message, msg)) else: dtype = {} +columns_with_null_int = {} +def null_handler(rows, columns_with_null_int): +for row in rows: +row = row.asDict() +for column in columns_with_null_int: +val = row[column] +dt = dtype[column] +if val is not None: +if abs(val) > 16777216: # Max value before np.float32 loses precision. +val = np.float64(val) +if np.float64 != dt: --- End diff -- Is this `if` totally necessary or can we just move the two assignments up? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r140141918 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -354,63 +401,30 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold +def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() +def estimateSize(precise: Boolean): Long = { + if (precise) { +serializationStream.flush() --- End diff -- can you send a PR to fix this issue for `putIteratorAsBytes` first? It will make this PR easier to review --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17819 **[Test build #82016 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82016/testReport)** for PR 17819 at commit [`f70fc2a`](https://github.com/apache/spark/commit/f70fc2a956082a83859313f663649a9b17dbbf36). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19286: [SPARK-21338][SQL][FOLLOW-UP] Implement isCascadingTrunc...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19286 @gatorsmile Does the added test look good for you? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r140140577 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") +} else if (endpoints.length < 2) { + TypeCheckFailure("The number of endpoints must be >= 2 to construct intervals") +} else { + TypeCheckSuccess +} + } + + // N endpoints construct N-1 intervals, creating a HLLPP for each interval + private lazy val hllppArray = { +val array = new Array[HyperLogLogPlusPlusHelper](endpoints.length - 1) +for (i <- array.indices) { + array(i) = new
[GitHub] spark issue #19298: [SPARK-22076][SQL][followup] Expand.projections should n...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19298 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82014/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19298: [SPARK-22076][SQL][followup] Expand.projections should n...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19298 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19298: [SPARK-22076][SQL][followup] Expand.projections should n...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19298 **[Test build #82014 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82014/testReport)** for PR 19298 at commit [`aa263d7`](https://github.com/apache/spark/commit/aa263d7b16630ad134e80fbf68a42165667446b5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...
Github user logannc commented on the issue: https://github.com/apache/spark/pull/18945 Sorry I feel off the face of the earth. I finally had some time to sit down and do this. I took your suggestions but implemented it a little differently. Unless I've made a dumb mistake, I think I improved on it a bit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19280 > Looks ok to me, assuming the "default serializer" in SerializerManager is configured correctly through other means. I think that part is fine. The serializer is created here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkEnv.scala#L279 The same instance is assigned to `SparkEnv.serializer`: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkEnv.scala#L374 Which has its default classloader set in Executor.scala, right by the part I'm changing: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L131 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19277: [SPARK-22058][CORE]the BufferedInputStream will n...
Github user zuotingbing commented on a diff in the pull request: https://github.com/apache/spark/pull/19277#discussion_r140137420 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -351,14 +351,14 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption -val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) -} - + try { + val codec = codecName.map { c => +codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { - case e: Exception => + case e: Throwable => --- End diff -- everything include exceptions and errors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19300: [SPARK-22082][SparkR]Spelling mistake: choosen in...
GitHub user zuotingbing opened a pull request: https://github.com/apache/spark/pull/19300 [SPARK-22082][SparkR]Spelling mistake: choosen in API doc of R. ## What changes were proposed in this pull request? "choosen" should be "chosen" in API doc of R. http://spark.apache.org/docs/latest/api/R/index.html , see `spark.kmeans` ## How was this patch tested? NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/zuotingbing/spark SPARK-22082 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19300.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19300 commit 87023af4807352843954ff251fed0dad69349916 Author: zuotingbingDate: 2017-09-21T02:19:42Z [SPARK-22082][SparkR]Spelling mistake: choosen in API doc of R. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14325: [SPARK-16692] [ML] Add multi label classification evalua...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/14325 ping @gatorsmile Add this to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140136101 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- lemme put my concern another way: why don't you remove the `if (!jobGroupToMaxConTasks.contains(jobGroupId))`, and just unconditionally always make the assignment `jobGroupToMaxConTasks(jobGroupId) = maxConTasks`? that is simpler to reason about, and has all the properties we want. I agree the scenario I'm describing is pretty weird, but the only difference I see between your version and this is in that scenario. And its probably not the behavior we want in that scenario. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14325: [SPARK-16692] [ML] Add multi label classification evalua...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/14325 You should override `override def evaluate(dataset: Dataset[_])` (without the label param). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18029: [SPARK-20168] [DStream] Add changes to use kinesis fetch...
Github user yssharma commented on the issue: https://github.com/apache/spark/pull/18029 @budde could you please do one last review of this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19106 ping @srowen Any other comments ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19288: [SPARK-22075][ML] GBTs unpersist datasets cached by Chec...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19288 OK I agree. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19294 **[Test build #82015 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82015/testReport)** for PR 19294 at commit [`621c337`](https://github.com/apache/spark/commit/621c337f16725fe9be83ae974efd2eaa83eb8762). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19294 cc @jiangxb1987 who I believe is interested in this. Without a super close look, it looks making sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19294 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r140131809 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => +val numericArray = arrayData.toObjectArray(baseType) +numericArray.map { x => + baseType.numeric.toDouble(x.asInstanceOf[baseType.InternalType]) +} +} + + override def checkInputDataTypes(): TypeCheckResult = { +val defaultCheck = super.checkInputDataTypes() +if (defaultCheck.isFailure) { + defaultCheck +} else if (!endpointsExpression.foldable) { + TypeCheckFailure("The intervals provided must be constant literals") --- End diff -- I'll check element type of `endpointsExpression`, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19299: update from upstream
Github user wankunde closed the pull request at: https://github.com/apache/spark/pull/19299 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19299: update from upstream
GitHub user wankunde opened a pull request: https://github.com/apache/spark/pull/19299 update from upstream ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wankunde/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19299.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19299 commit 234a3aab1390d3e7004913d234d8452994a16f16 Author: wankun <316945...@qq.com> Date: 2016-10-21T03:07:06Z Merge pull request #1 from apache/master pull upstream commit f74f8f4d0b680384a719740ef70b4bf5d1a2 Author: wankun <316945...@qq.com> Date: 2016-10-21T04:18:11Z Merge pull request #3 from apache/master upstream commit 16b2a597bfe1f98cefb817c5d60d143ed1af5591 Author: wankun603Date: 2017-09-07T01:07:18Z Merge branch 'master' of https://github.com/apache/spark commit 24fbc04cfd459f5598941968c44f11adde64a5dc Author: wankun603 Date: 2017-09-21T01:00:04Z Merge branch 'master' of https://github.com/apache/spark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19288: [SPARK-22075][ML] GBTs unpersist datasets cached by Chec...
Github user zhengruifeng commented on the issue: https://github.com/apache/spark/pull/19288 @WeichenXu123 It maybe better to destory intermediate objects ASAP --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15544: [SPARK-17997] [SQL] Add an aggregation function f...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/15544#discussion_r140129058 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import java.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, HyperLogLogPlusPlusHelper} +import org.apache.spark.sql.types._ + +/** + * This function counts the approximate number of distinct values (ndv) in + * intervals constructed from endpoints specified in `endpointsExpression`. The endpoints should be + * sorted into ascending order. E.g., given an array of endpoints + * (endpoint_1, endpoint_2, ... endpoint_N), returns the approximate ndv's for intervals + * [endpoint_1, endpoint_2], (endpoint_2, endpoint_3], ... (endpoint_N-1, endpoint_N]. + * To count ndv's in these intervals, apply the HyperLogLogPlusPlus algorithm in each of them. + * @param child to estimate the ndv's of. + * @param endpointsExpression to construct the intervals, should be sorted into ascending order. + * @param relativeSD The maximum estimation error allowed in the HyperLogLogPlusPlus algorithm. + */ +case class ApproxCountDistinctForIntervals( +child: Expression, +endpointsExpression: Expression, +relativeSD: Double = 0.05, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends ImperativeAggregate with ExpectsInputTypes { + + def this(child: Expression, endpointsExpression: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = 0.05, + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, endpointsExpression: Expression, relativeSD: Expression) = { +this( + child = child, + endpointsExpression = endpointsExpression, + relativeSD = HyperLogLogPlusPlus.validateDoubleLiteral(relativeSD), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def inputTypes: Seq[AbstractDataType] = { +Seq(TypeCollection(NumericType, TimestampType, DateType), ArrayType) + } + + // Mark as lazy so that endpointsExpression is not evaluated during tree transformation. + lazy val endpoints: Array[Double] = +(endpointsExpression.dataType, endpointsExpression.eval()) match { + case (ArrayType(baseType: NumericType, _), arrayData: ArrayData) => --- End diff -- This aggregate function is only used internally, the expression will use internal types, so there is no case using date/timestamp endpoints. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...
Github user fjh100456 commented on the issue: https://github.com/apache/spark/pull/19218 @gatorsmile @dongjoon-hyun I'd fix it. Could you helpe me to review it again? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user ConeyLiu commented on a diff in the pull request: https://github.com/apache/spark/pull/19285#discussion_r140126755 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -354,63 +401,30 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } -// Request enough memory to begin unrolling -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - -if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + -s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") -} else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold +def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } -def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { -val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong -keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) -if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest -} - } -} - -// Unroll this block safely, checking whether we have exceeded our threshold -while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - elementsUnrolled += 1 - if (elementsUnrolled % memoryCheckPeriod == 0) { -reserveAdditionalMemoryIfNecessary() +def estimateSize(precise: Boolean): Long = { + if (precise) { +serializationStream.flush() --- End diff -- Because there are some data cached in the serializationStream, we can't get the precise size if don't call `flush`. Previous we don't check again after unrolled the block, and it directly call the `serializationStream.close()`. But here we maybe need the `serializationStream` again if we can't get anther unroll memory, so we only should call `flush`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17412: [SPARK-20084][Core] Remove internal.metrics.updatedBlock...
Github user zhouyejoe commented on the issue: https://github.com/apache/spark/pull/17412 @rdblue Hi, why not the blockstatusupdates are not filtering out in executorMetricsUpdate? This line https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala#L245 While I am working on SPARK-21961, I filtered those blockstatusupdates while reading from logs in Spark History Server, but it causing some unit test failure. Should it not be filtered out in both executorMetricsUpdateFromJson and executorMetricsUpdateToJson? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140122744 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { --- End diff -- The previous test covers all the necessary checks introduced by the change. I added this to cover the default scenario when no job group is specified. Can do away with this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140124886 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- From my understanding and in context of the current code, you would group jobs together when you want their concurrency to be limited. If you want different concurrency limits for different jobs, you would set them in a different jobgroup altogether. If there are multiple jobs in the same job group which run concurrently and one of them sets a value different, then which one wins for the existing jobs and the new job? If we want to have a different value for every job then the user would need a way to know and identify a spark job in his application code , probably by a job id. Only by means of identifying a job, would the user be able to set the config for that job. This cannot be known apriori and I don't know if there is an easy way that the user can know about the underlying spark job corresponding to the action. Hence we apply a setting at the jobgroup level which allows the user to allow him to control the concurrency without knowing the underlying job related details specific to spark in an easy manner. Let me know if anything is unclear here or if you have more questions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140122769 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) + +// make 5 offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 3 cores per executor) --- End diff -- okay. Won't be needed as will be removing the test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140123047 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumSpeculativeTasks(stageId) = stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 +maxConcurrentTasks = getMaxConTasks +logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task submitted.") allocationManager.onSchedulerBacklogged() } } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs + // with multiple stages. We need to get all the active stages belonging to a job group to + // calculate the total no. of pending + running tasks to decide the maximum no. of executors + // we need at that time to serve the outstanding tasks. This is capped by the minimum no. of + // outstanding tasks and the max concurrent limit specified for the job group if any. + + def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { +totalPendingTasks(stageId) + totalRunningTasks(stageId) + } + + def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (totalTasks, stageToNumTasks) => { +val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, stageToNumTasks._2) +sumOrMax(totalTasks, activeTasks) + } + // Get the total running & pending tasks for all stages in a job group. + def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int = { +stagesItr.foldLeft(0)(sumIncompleteTasksForStages) + } + + def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, Int])) => Int = { +(maxConTasks, x) => { + val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2) + val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), totalIncompleteTasksForJobGroup) + sumOrMax(maxConTasks, maxTasks) +} + } + + def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) Int.MaxValue else (a + b) + + def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a) + + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) --- End diff -- I like the idea. I think this can be done. Will update the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19298: [SPARK-22076][SQL][followup] Expand.projections should n...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19298 **[Test build #82014 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82014/testReport)** for PR 19298 at commit [`aa263d7`](https://github.com/apache/spark/commit/aa263d7b16630ad134e80fbf68a42165667446b5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19298: [SPARK-22076][SQL][followup] Expand.projections should n...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19298 cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19298: [SPARK-22076][SQL][followup] Expand.projections s...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/19298 [SPARK-22076][SQL][followup] Expand.projections should not be a Stream ## What changes were proposed in this pull request? This a follow-up of https://github.com/apache/spark/pull/19289 , we missed another place: `rollup`. `Seq.init.toSeq` also returns a `Stream`, we should fix it too. ## How was this patch tested? manually You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark bug Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19298.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19298 commit aa263d7b16630ad134e80fbf68a42165667446b5 Author: Wenchen FanDate: 2017-09-21T00:12:25Z Expand.projections should not be a Stream --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r140121928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + + +/** + * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at a time. + * + * Python evaluation works by sending the necessary (projected) input data via a socket to an + * external Python process, and combine the result from the Python process with the original row. + * + * For each row we send to Python, we also put it in a queue first. For each output row from Python, + * we drain the queue to find the original input row. Note that if the Python process is way too + * slow, this could lead to the queue growing unbounded and spill into disk when run out of memory. + * + * Here is a diagram to show how this works: + * + *Downstream (for parent) + * / \ + */ socket (output of UDF) + * / \ + *RowQueuePython + * \ / + *\ socket (input of UDF) + * \ / + * upstream (from child) --- End diff -- that's fine but either looks fine and not a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140118143 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -63,7 +63,8 @@ class MesosCredentialRenewer( def scheduleTokenRenewal(): Unit = { def scheduleRenewal(runnable: Runnable): Unit = { - val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + // val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + val remainingTime = 5000 --- End diff -- well that's embarrassing, just a debugging tool that I forgot to remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140117834 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -198,16 +198,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) -if (principal != null) { +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { logDebug(s"Principal found ($principal) starting token renewer") val credentialRenewerThread = new Thread { setName("MesosCredentialRenewer") override def run(): Unit = { + val dummy: Option[Array[Byte]] = None --- End diff -- whoops! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140117253 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -63,7 +63,8 @@ class MesosCredentialRenewer( def scheduleTokenRenewal(): Unit = { def scheduleRenewal(runnable: Runnable): Unit = { - val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + // val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + val remainingTime = 5000 --- End diff -- Why 5000? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user susanxhuynh commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r140117055 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -198,16 +198,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) -if (principal != null) { +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { logDebug(s"Principal found ($principal) starting token renewer") val credentialRenewerThread = new Thread { setName("MesosCredentialRenewer") override def run(): Unit = { + val dummy: Option[Array[Byte]] = None --- End diff -- What is this for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18945 @BryanCutler, @a10y and @viirya, would you guys be interested in this and have some time to take over this with the different approach we discussed above - https://github.com/apache/spark/pull/18945#issuecomment-323917328 and https://github.com/apache/spark/pull/18945#discussion_r134033952? I could take over this too if you guys are currently busy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19141: [SPARK-21384] [YARN] Spark + YARN fails with Loca...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19141 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19141 (Also merging to 2.2.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19141 LGTM, merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18659 @ueshin I haven't had much luck with the casting workaround: ``` pa.Array.from_pandas(s.astype(t.to_pandas_dtype(), copy=False), mask=s.isnull(), type=t) ``` It appears that it forces a copy for floating point -> integer and then checks if any NaNs, so I get the error `ValueError: Cannot convert non-finite values (NA or inf) to integer`. I'm using Pandas 0.20.1, but also tried 0.19.4 with the same result, any ideas? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19297: [SPARK-18838][hotfix][yarn] Check internal contex...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19297 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19297 Merging to master to unbreak the build. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19297 **[Test build #82013 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82013/testReport)** for PR 19297 at commit [`003dd5a`](https://github.com/apache/spark/commit/003dd5a70a31d9579dce483814edf46c399e7c49). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19297 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82013/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19297 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19271 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19271 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82011/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19271 **[Test build #82011 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82011/testReport)** for PR 19271 at commit [`d8116d0`](https://github.com/apache/spark/commit/d8116d0577bba516320c976debba9eb708fe2ce2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18924#discussion_r140111453 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } /** - * Update alpha based on `gammat`, the inferred topic distributions for documents in the - * current mini-batch. Uses Newton-Rhapson method. + * Update alpha based on `logphat`. --- End diff -- There're a small difference between old `N` and `batchSize`. `N` in old version code do not count non-empty docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19297 **[Test build #82013 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82013/testReport)** for PR 19297 at commit [`003dd5a`](https://github.com/apache/spark/commit/003dd5a70a31d9579dce483814edf46c399e7c49). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19297 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19280 Looks ok to me, assuming the "default serializer" in SerializerManager is configured correctly through other means. Title would sound better with a possessive: "SerializerManager's private kryo" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19297 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82012/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19297 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19297: [SPARK-18838][hotfix][yarn] Check internal context state...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19297 **[Test build #82012 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82012/testReport)** for PR 19297 at commit [`003dd5a`](https://github.com/apache/spark/commit/003dd5a70a31d9579dce483814edf46c399e7c49). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19271 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19271 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82010/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19271 **[Test build #82010 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82010/testReport)** for PR 19271 at commit [`5198ef3`](https://github.com/apache/spark/commit/5198ef3d2bf38ce827f0fd5a026724f0825dd7dd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org