[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/19487 @HyukjinKwon That was exactly the initial solution I tested locally when we saw the problem with Phoenix. The reason to expand it was two fold: a) This change preserves existing behavior from 2.1 b) There are three cases where `new Path` can throw exception, and I would be special case'ing only two of them (null and empty string). My understanding is that hadoop does not require to create a Path out of `mapred.output.dir`/`mapreduce.output.fileoutputformat.outputdir` - it is upto the individual OutputFormat/Committer to handle it appropriately (unless I am missing something here - @steveloughran would be able to opine better on this). Expectation of `mapred.output.dir`/`mapreduce.output.fileoutputformat.outputdir` being a valid path is a behavior change we introduced in 2.2 in `HadoopMapReduceCommitProtocol` : since we are now directly handling promotion of files to final location for some cases in `commitJob` or cleanup in `abortJob` : this is done in addition to what is done by committer (which is invoked before our code). For committers which are not `Path` based, this promotion of output by spark does not apply (there is no path to promote !) - which is what @szhem's patch was fixing - except it was handling only null. Having said all this - if it is the expectation in hadoop that `mapred.output.dir`/`mapreduce.output.fileoutputformat.outputdir` , when specified, should be a valid `Path` - I will hapilly change it to special case'ing only for `null` and `""`. Unless we get that clarity, IMO we should preserve behavior and be defensive about when we try to do manual promotion. Please note that if invalid value is being provided for `mapred.output.dir`/`mapreduce.output.fileoutputformat.outputdir` - the corresponding output format or committer will throw approrpriate exception (like in case of MR or pig or hive). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19464 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19464 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82752/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19464 **[Test build #82752 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82752/testReport)** for PR 19464 at commit [`a6818b6`](https://github.com/apache/spark/commit/a6818b60adef7bec35b002846a3a504ae53dd9f9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19487 @mridulm, what do you think about dealing with empty string for now and other cases later if we can't male sure for other cases for now? I guess the actual issue found is about empty string anyway. BTW, we might have to warn about empty string set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19496: [SPARK-22271][SQL]mean overflows and returns null for so...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19496 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19496: [SPARK-22271][SQL]mean overflows and returns null...
GitHub user huaxingao opened a pull request: https://github.com/apache/spark/pull/19496 [SPARK-22271][SQL]mean overflows and returns null for some decimal variables ## What changes were proposed in this pull request? In Average.scala, it has ``` override lazy val evaluateExpression = child.dataType match { case DecimalType.Fixed(p, s) => // increase the precision and scale to prevent precision loss val dt = DecimalType.bounded(p + 14, s + 4) Cast(Cast(sum, dt) / Cast(count, dt), resultType) case _ => Cast(sum, resultType) / Cast(count, resultType) } def setChild (newchild: Expression) = { child = newchild } ``` It is possible that Cast(count, dt), resultType) will make the precision of the decimal number bigger than 38, and this causes over flow. Since count is an integer and doesn't need a scale, I will cast it using DecimalType.bounded(38,0) ## How was this patch tested? In DataFrameSuite, I will add a test case. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huaxingao/spark spark-22271 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19496.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19496 commit a3437ee4a87d1f51b362adeb20d4fcc264085ba7 Author: Huaxin GaoDate: 2017-10-14T04:45:27Z [SPARK-22271][SQL]mean overflows and returns null for some decimal variables --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19480: [SPARK-22226][SQL] splitExpression can create too...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19480#discussion_r144685307 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2103,4 +2103,35 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)), Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2))) } + + test("SPARK-6: splitExpressions should not generate codes beyond 64KB") { +val colNumber = 1 +val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) +val df = sqlContext.createDataFrame(input, StructType( + (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false +val newCols = (1 to colNumber).flatMap { colIndex => + Seq(expr(s"if(1000 < _$colIndex, 1000, _$colIndex)"), +expr(s"sqrt(_$colIndex)")) +} +df.select(newCols: _*).collect() + } + + test("SPARK-6: too many splitted expressions should not exceed constant pool limit") { --- End diff -- It seems hard to make an end-to-end test for the reported issue after several tries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19464 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19480: [SPARK-22226][SQL] splitExpression can create too...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19480#discussion_r144684909 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2103,4 +2103,35 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)), Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2))) } + + test("SPARK-6: splitExpressions should not generate codes beyond 64KB") { +val colNumber = 1 +val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) +val df = sqlContext.createDataFrame(input, StructType( + (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false +val newCols = (1 to colNumber).flatMap { colIndex => + Seq(expr(s"if(1000 < _$colIndex, 1000, _$colIndex)"), +expr(s"sqrt(_$colIndex)")) +} +df.select(newCols: _*).collect() + } + + test("SPARK-6: too many splitted expressions should not exceed constant pool limit") { --- End diff -- But actually I tested this committed test, once I lowered the threshold from 160 to 100, it can pass too in current master branch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19495 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82751/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19495 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19495 **[Test build #82751 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82751/testReport)** for PR 19495 at commit [`67114ab`](https://github.com/apache/spark/commit/67114ab59f5a8d79fbe66b7deb93869f656346b9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19480: [SPARK-22226][SQL] splitExpression can create too...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19480#discussion_r144684397 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -277,13 +292,25 @@ class CodegenContext { funcName: String, funcCode: String, inlineToOuterClass: Boolean = false): String = { +val newFunction = addNewFunctionInternal(funcName, funcCode, inlineToOuterClass) +newFunction match { + case NewFunction(functionName, None, None) => functionName + case NewFunction(functionName, Some(_), Some(subclassInstance)) => +subclassInstance + "." + functionName +} + } + + private[this] def addNewFunctionInternal( + funcName: String, + funcCode: String, + inlineToOuterClass: Boolean): NewFunction = { // The number of named constants that can exist in the class is limited by the Constant Pool // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a -// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// threshold of 1000k bytes to determine when a function should be inlined to a private, nested // sub-class. val (className, classInstance) = if (inlineToOuterClass) { outerClassName -> "" -} else if (currClassSize > 160) { +} else if (currClassSize > 100) { --- End diff -- Yeah, actually during several tries, I found that setting the value lower can somehow reduce the chance to hit constant pool limit exception in nested classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19488: [SPARK-22266][SQL] The same aggregate function wa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19488#discussion_r144684245 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -205,14 +205,17 @@ object PhysicalAggregation { case logical.Aggregate(groupingExpressions, resultExpressions, child) => // A single aggregate expression might appear multiple times in resultExpressions. // In order to avoid evaluating an individual aggregate function multiple times, we'll - // build a set of the distinct aggregate expressions and build a function which can + // build a map of the distinct aggregate expressions and build a function which can // be used to re-write expressions so that they reference the single copy of the - // aggregate function which actually gets computed. - val aggregateExpressions = resultExpressions.flatMap { expr => + // aggregate function which actually gets computed. Note that aggregate expressions + // should always be deterministic, so we can use its canonicalized expression as its --- End diff -- good point, I think `first` is a nondeterministic function, SQL Server also document it: https://docs.microsoft.com/en-us/sql/relational-databases/user-defined-functions/deterministic-and-nondeterministic-functions (search for FIRST_VALUE). But here, although `first` is nondeterministic, I think `select first(a) - first(a) from ...` should return 0, as its result should be consistent in-query. CC @gatorsmile @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19467 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19467 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82749/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19467 **[Test build #82749 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82749/testReport)** for PR 19467 at commit [`971f579`](https://github.com/apache/spark/commit/971f57963a3f8f7f5b0481441fef387c80920048). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19464: [SPARK-22233] [core] Allow user to filter out empty spli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19464 **[Test build #82752 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82752/testReport)** for PR 19464 at commit [`a6818b6`](https://github.com/apache/spark/commit/a6818b60adef7bec35b002846a3a504ae53dd9f9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...
Github user liutang123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19464#discussion_r144683771 --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala --- @@ -510,4 +510,87 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } + test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") { +val conf = new SparkConf() +conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true) +sc = new SparkContext(conf) + +def testIgnoreEmptySplits( + data: Array[Tuple2[String, String]], + actualPartitionNum: Int, + expectedPart: String, + expectedPartitionNum: Int): Unit = { + val output = new File(tempDir, "output") + sc.parallelize(data, actualPartitionNum) +.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath) + assert(new File(output, expectedPart).exists() === true) + val hadoopRDD = sc.textFile(new File(output, "part-*").getPath) + assert(hadoopRDD.partitions.length === expectedPartitionNum) + Utils.deleteRecursively(output) --- End diff -- I think we don't need `try... finally` here. Because `Utils.deleteRecursively(output)` just to ensure the success of next invocation of the `testIgnoreEmptySplits`. When test finished, wether be passed or not, the `tempDir` will be deleted in `FileSuite.afterEach()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19480: [SPARK-22226][SQL] splitExpression can create too...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19480#discussion_r144683715 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2103,4 +2103,35 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)), Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2))) } + + test("SPARK-6: splitExpressions should not generate codes beyond 64KB") { +val colNumber = 1 +val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) +val df = sqlContext.createDataFrame(input, StructType( + (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false +val newCols = (1 to colNumber).flatMap { colIndex => + Seq(expr(s"if(1000 < _$colIndex, 1000, _$colIndex)"), +expr(s"sqrt(_$colIndex)")) +} +df.select(newCols: _*).collect() + } + + test("SPARK-6: too many splitted expressions should not exceed constant pool limit") { --- End diff -- The following which can't be passed in current master branch can be passed with your fix. I didn't see OOM issue and nestedclass constant pool issue. ```scala test("SPARK-6: too many splitted expressions should not exceed constant pool limit") { val colNumber = 5000 val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) val df = sqlContext.createDataFrame(input, StructType( (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false val funcs = (1 to colNumber).map { colIndex => val colName = s"_$colIndex" col(colName).cast(LongType) } df.select(funcs: _*).dropDuplicates((1 to 5).map(colIndex => s"_$colIndex")).collect() } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19452 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19452 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82748/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19452 **[Test build #82748 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82748/testReport)** for PR 19452 at commit [`0a753ed`](https://github.com/apache/spark/commit/0a753ed36018efe0be5533084fc7b6040586cbb5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19491: [SPARK-22273][SQL] Fix key/value schema field names in H...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19491 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...
Github user ArtRand commented on the issue: https://github.com/apache/spark/pull/19272 @vanzin Thanks for the review. I'll address the comments ASAP. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144682661 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -896,8 +913,8 @@ private[spark] class MesosClusterScheduler( revive() } - private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = { -pendingRetryDriversState.persist(taskId, desc) + private def addDriverToPending(desc: MesosDriverDescription, id: String) = { +pendingRetryDriversState.persist(id, desc) --- End diff -- Maybe keep the name as `subId` because it could be confusing otherwise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144682434 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -373,10 +374,16 @@ class SparkContext(config: SparkConf) extends Logging { // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") -// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster -if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { - throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + -"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") +// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster or +// System property spark.mesos.driver.frameworkId must be set if user code ran by +// Mesos Dispatcher on a MESOS cluster +if (deployMode == "cluster") { --- End diff -- FWIW, I _believe_ that when we submit a job with the dispatcher `deployMode` is actually set to `client`, so this logic may not be invoked as expected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19374#discussion_r144682554 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler( s"${frameworkId}-${desc.submissionId}${retries}" } + private def getDriverTaskId(desc: MesosDriverDescription): String = { +val sId = desc.submissionId +desc.retryState.map(state => sId + s"-retry-${state.retries.toString}").getOrElse(sId) + } + + private def getSumbmissionIdFromTaskId(taskId: String): String = { +taskId.split("-retry-").head --- End diff -- Maybe make this a constant? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r144682189 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -28,6 +28,8 @@ import com.google.common.base.Splitter import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.FrameworkInfo.Capability +import org.apache.mesos.Protos.Resource.AllocationInfo --- End diff -- maybe just use `Protos` imported above? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r144682211 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -175,14 +175,22 @@ trait MesosSchedulerUtils extends Logging { registerLatch.countDown() } - def createResource(name: String, amount: Double, role: Option[String] = None): Resource = { + def createResource( + name: String, + amount: Double, + role: Option[String] = None, + allocationInfo: Option[AllocationInfo] = None, + reservationInfo: Option[ReservationInfo] = None): Resource = { val builder = Resource.newBuilder() .setName(name) .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(amount).build()) role.foreach { r => builder.setRole(r) } - +if (role.forall(r => !r.equals("*"))) { --- End diff -- Make a constant for the star role (`*`) called `ANY_ROLE` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r144682163 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -380,7 +389,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } else { declineOffer( driver, - offer) + offer, --- End diff -- As an aside, what do you think about adding more detailed logging of the reason why offers are declined? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r144682379 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -469,6 +474,12 @@ trait MesosSchedulerUtils extends Logging { .setType(Value.Type.RANGES) .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) role.foreach(r => builder.setRole(r)) + + if (role.forall(r => !r.equals("*"))) { --- End diff -- Maybe abstract this logic into a general function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144682312 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala --- @@ -17,10 +17,14 @@ package org.apache.spark.scheduler.cluster.mesos --- End diff -- Out of curiosity, why do we have this file _and_ `MesosSchedulerUtils`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144681733 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala --- @@ -170,9 +175,119 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { containerInfo.addNetworkInfos(info) } +getSecretVolume(conf, secretConfig).foreach { volume => + if (volume.getSource.getSecret.getReference.isInitialized) { +logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" + + s"on file ${volume.getContainerPath}") + } else { +logInfo(s"Setting secret on file name=${volume.getContainerPath}") + } + containerInfo.addVolumes(volume) +} + containerInfo } + def addSecretEnvVar( --- End diff -- Is it possible to make this return `List[Variable]` like it used to as opposed to mutating the `Environment.Builder`, just more consistent (e.g. `getSecretVolume`) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144680213 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala --- @@ -159,7 +160,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - executorInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) +executorInfo.setContainer( --- End diff -- I'd almost prefer that we don't add any features to fine-grained right now. As we have virtually no test coverage on whether or not this will work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144680478 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala --- @@ -170,9 +175,119 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { containerInfo.addNetworkInfos(info) } +getSecretVolume(conf, secretConfig).foreach { volume => + if (volume.getSource.getSecret.getReference.isInitialized) { +logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" + --- End diff -- Need a space at the end of this log line (my bad!) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144680608 --- Diff: docs/running-on-mesos.md --- @@ -522,6 +522,43 @@ See the [configuration page](configuration.html) for information on Spark config + spark.mesos.executor.secret.envkeys + (none) + +A comma-separated list that, if set, the contents of the secret referenced --- End diff -- What do you think about putting an example here like we do for `spark.mesos.network.labels` - something general for all secrets? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144681758 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala --- @@ -170,9 +175,119 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { containerInfo.addNetworkInfos(info) } +getSecretVolume(conf, secretConfig).foreach { volume => + if (volume.getSource.getSecret.getReference.isInitialized) { +logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" + + s"on file ${volume.getContainerPath}") + } else { +logInfo(s"Setting secret on file name=${volume.getContainerPath}") + } + containerInfo.addVolumes(volume) +} + containerInfo } + def addSecretEnvVar( + envBuilder: Environment.Builder, + conf: SparkConf, + secretConfig: MesosSecretConfig): Unit = { +getSecretEnvVar(conf, secretConfig).foreach { variable => + if (variable.getSecret.getReference.isInitialized) { +logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName}" + + s"on file ${variable.getName}") + } else { +logInfo(s"Setting secret on environment variable name=${variable.getName}") + } + envBuilder.addVariables(variable) +} + } + + private def getSecrets(conf: SparkConf, secretConfig: MesosSecretConfig): + Seq[Secret] = { --- End diff -- Indentation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144680448 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala --- @@ -122,7 +126,8 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .toList } - def containerInfo(conf: SparkConf): ContainerInfo.Builder = { + def buildContainerInfo(conf: SparkConf, secretConfig: MesosSecretConfig): + ContainerInfo.Builder = { val containerType = if (conf.contains("spark.mesos.executor.docker.image") && --- End diff -- Should probably have a check here for if secrets are present, because I don't think that secrets will work if you're _not_ also using the Mesos containerizer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144680489 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala --- @@ -170,9 +175,119 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { containerInfo.addNetworkInfos(info) } +getSecretVolume(conf, secretConfig).foreach { volume => + if (volume.getSource.getSecret.getReference.isInitialized) { +logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" + + s"on file ${volume.getContainerPath}") + } else { +logInfo(s"Setting secret on file name=${volume.getContainerPath}") + } + containerInfo.addVolumes(volume) +} + containerInfo } + def addSecretEnvVar( + envBuilder: Environment.Builder, + conf: SparkConf, + secretConfig: MesosSecretConfig): Unit = { +getSecretEnvVar(conf, secretConfig).foreach { variable => + if (variable.getSecret.getReference.isInitialized) { +logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName}" + --- End diff -- Space at the end of this log line too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets
Github user ArtRand commented on a diff in the pull request: https://github.com/apache/spark/pull/19437#discussion_r144680353 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala --- @@ -122,7 +126,8 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .toList } - def containerInfo(conf: SparkConf): ContainerInfo.Builder = { + def buildContainerInfo(conf: SparkConf, secretConfig: MesosSecretConfig): --- End diff -- Maybe change `secretConfig` to `mesosConfig` and pass the whole thing? That way if we want to add new functionality later this function is more general. Given that most of what we do is proto-generation, I bet we'll have to do this eventually anyways. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19495 **[Test build #82751 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82751/testReport)** for PR 19495 at commit [`67114ab`](https://github.com/apache/spark/commit/67114ab59f5a8d79fbe66b7deb93869f656346b9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/19495 [SPARK-22278][SS] Expose current event time watermark and current processing time in GroupState ## What changes were proposed in this pull request? Complex state-updating and/or timeout-handling logic in mapGroupsWithState functions may require taking decisions based on the current event-time watermark and/or processing time. Currently, you can use the SQL function `current_timestamp` to get the current processing time, but it needs to be passed inserted in every row with a select, and then passed through the encoder, which isn't efficient. Furthermore, there is no way to get the current watermark. This PR exposes both of them through the GroupState API. Additionally, it also cleans up some of the GroupState docs. ## How was this patch tested? New unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-22278 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19495 commit c9a042e2f0228584f6a3f643cfac412c73ed98d7 Author: Tathagata DasDate: 2017-10-10T00:01:02Z Expose event time watermark in the GorupState commit 67114ab59f5a8d79fbe66b7deb93869f656346b9 Author: Tathagata Das Date: 2017-10-14T00:16:08Z Exposed processing time --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add su...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19385 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add support f...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19385 Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19480: [SPARK-22226][SQL] splitExpression can create too...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19480#discussion_r144678394 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2103,4 +2103,35 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)), Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2))) } + + test("SPARK-6: splitExpressions should not generate codes beyond 64KB") { +val colNumber = 1 +val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) +val df = sqlContext.createDataFrame(input, StructType( + (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false +val newCols = (1 to colNumber).flatMap { colIndex => + Seq(expr(s"if(1000 < _$colIndex, 1000, _$colIndex)"), +expr(s"sqrt(_$colIndex)")) +} +df.select(newCols: _*).collect() + } + + test("SPARK-6: too many splitted expressions should not exceed constant pool limit") { --- End diff -- Hmm, interesting. Because your committed test can be seen as to do `dropDuplicates` on a 6000 column dataframe (`df.select(funcs: _*)`). The `funcs` are actually performed at the lower `Project`. ```scala val df2 = df.select(funcs: _*).cache() df2.collect() // Pass val df3 = df2.dropDuplicates((1 to 5).map(colIndex => s"_$colIndex")) df3.collect() // Exception ``` So I think the simplified has the same effect? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add support f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19385 **[Test build #82750 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82750/testReport)** for PR 19385 at commit [`ced3e58`](https://github.com/apache/spark/commit/ced3e58c415fa98546a0dfdb3a0d1703e835f436). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add support f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19385 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82750/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add support f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19385 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add support f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19385 **[Test build #82750 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82750/testReport)** for PR 19385 at commit [`ced3e58`](https://github.com/apache/spark/commit/ced3e58c415fa98546a0dfdb3a0d1703e835f436). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16801: [SPARK-13619] [WEBUI] [CORE] Jobs page UI shows wrong nu...
Github user devaraj-kavali commented on the issue: https://github.com/apache/spark/pull/16801 Will identify better solution to fix this issue and create a new PR, closing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16801: [SPARK-13619] [WEBUI] [CORE] Jobs page UI shows w...
Github user devaraj-kavali closed the pull request at: https://github.com/apache/spark/pull/16801 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19467 **[Test build #82749 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82749/testReport)** for PR 19467 at commit [`971f579`](https://github.com/apache/spark/commit/971f57963a3f8f7f5b0481441fef387c80920048). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19452 **[Test build #82748 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82748/testReport)** for PR 19452 at commit [`0a753ed`](https://github.com/apache/spark/commit/0a753ed36018efe0be5533084fc7b6040586cbb5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19467 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19467 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82747/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19467 **[Test build #82747 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82747/testReport)** for PR 19467 at commit [`84ac2d8`](https://github.com/apache/spark/commit/84ac2d84fff94a54a01d92f0ecf00c1f9ace4203). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19396: [SPARK-22172][CORE] Worker hangs when the external shuff...
Github user devaraj-kavali commented on the issue: https://github.com/apache/spark/pull/19396 @jerryshao Please let me know if you don't convince with the above comment, I can make the changes to PR to make Worker do down on external shuffle service start failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add su...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19385#discussion_r144671613 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -174,6 +182,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( override def start() { super.start() +if (sc.deployMode == "client") { + launcherBackend.connect --- End diff -- add `()` since this is not a property getter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add support f...
Github user devaraj-kavali commented on the issue: https://github.com/apache/spark/pull/19385 Thanks @vanzin for looking into this. > This only solves half the problem, right? What about cluster mode? Yes, it solves the Mesos/Client mode. For Mesos/Cluster mode, I think the Standalone/Cluster(SPARK-11033) implementaton(or similar way) would handle since both use the same logic at the client side to submit the application. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19435: [MINOR][SS] "keyWithIndexToNumValues" -> "keyWith...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19435 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18747: [SPARK-20822][SQL] Generate code to directly get ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18747#discussion_r144668015 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -23,21 +23,37 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} -import org.apache.spark.sql.execution.LeafExecNode -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode} +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.types.UserDefinedType case class InMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], @transient relation: InMemoryRelation) - extends LeafExecNode { + extends LeafExecNode with ColumnarBatchScan { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren - override lazy val metrics = Map( -"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + override def vectorTypes: Option[Seq[String]] = + Option(Seq.fill(attributes.length)(classOf[OnHeapColumnVector].getName)) + + override val columnIndexes = +attributes.map(a => relation.output.map(o => o.exprId).indexOf(a.exprId)).toArray + + override val supportCodegen: Boolean = relation.useColumnarBatches + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +if (supportCodegen) { + val buffers = relation.cachedColumnBuffers + // HACK ALERT: This is actually an RDD[CachedBatch]. + // We're taking advantage of Scala's type erasure here to pass these batches along. + Seq(buffers.asInstanceOf[RDD[InternalRow]]) --- End diff -- @gatorsmile sure, done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18747: [SPARK-20822][SQL] Generate code to directly get ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18747#discussion_r144668103 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala --- @@ -84,25 +84,45 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { val columnarBatchClz = classOf[ColumnarBatch].getName val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") +val cachedBatchClz = "org.apache.spark.sql.execution.columnar.CachedBatch" +val cachedBatch = ctx.freshName("cachedBatch") val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnVectorClzs = vectorTypes.getOrElse( Seq.fill(colVars.size)(classOf[ColumnVector].getName)) +val columnAccessorClz = "org.apache.spark.sql.execution.columnar.ColumnAccessor" --- End diff -- Yeah, done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19385: [SPARK-11034] [LAUNCHER] [MESOS] Launcher: add support f...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19385 This only solves half the problem, right? What about cluster mode? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19435: [MINOR][SS] "keyWithIndexToNumValues" -> "keyWithIndexTo...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19435 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18747: [SPARK-20822][SQL] Generate code to directly get value f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18747 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18747: [SPARK-20822][SQL] Generate code to directly get value f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18747 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82746/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18747: [SPARK-20822][SQL] Generate code to directly get value f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18747 **[Test build #82746 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82746/testReport)** for PR 18747 at commit [`750b230`](https://github.com/apache/spark/commit/750b2308080c7ad135f231770b2fcab71523cbf4). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r144664962 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -349,12 +356,28 @@ case class StreamingSymmetricHashJoinExec( /** * Internal helper class to consume input rows, generate join output rows using other sides * buffered state rows, and finally clean up this sides buffered state rows + * + * @param joinSide The JoinSide - either left or right. + * @param inputAttributes The input attributes for this side of the join. + * @param joinKeys The join keys. + * @param inputIter The iterator of input rows on this side to be joined. + * @param preJoinFilter A filter over rows on this side. This filter rejects rows that could + * never pass the overall join condition no matter what other side row + * they're joined with. + * @param postJoinFilter A filter over joined rows. This filter completes the application of the + * overall join condition, assuming that preJoinFilter on both sides of the + * join has already been passed. --- End diff -- ^ awesome docs, very clear. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18979 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82745/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18979 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18979 **[Test build #82745 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82745/testReport)** for PR 18979 at commit [`c0e81a1`](https://github.com/apache/spark/commit/c0e81a1c87011efdc010f1c9ba28dde003458667). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/19467 LGTM, assuming tests pass. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19467: [SPARK-22238] Fix plan resolution bug caused by EnsureSt...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19467 **[Test build #82747 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82747/testReport)** for PR 19467 at commit [`84ac2d8`](https://github.com/apache/spark/commit/84ac2d84fff94a54a01d92f0ecf00c1f9ace4203). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19488: [SPARK-22266][SQL] The same aggregate function wa...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/19488#discussion_r144656360 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -205,14 +205,17 @@ object PhysicalAggregation { case logical.Aggregate(groupingExpressions, resultExpressions, child) => // A single aggregate expression might appear multiple times in resultExpressions. // In order to avoid evaluating an individual aggregate function multiple times, we'll - // build a set of the distinct aggregate expressions and build a function which can + // build a map of the distinct aggregate expressions and build a function which can // be used to re-write expressions so that they reference the single copy of the - // aggregate function which actually gets computed. - val aggregateExpressions = resultExpressions.flatMap { expr => + // aggregate function which actually gets computed. Note that aggregate expressions + // should always be deterministic, so we can use its canonicalized expression as its --- End diff -- So we are talking about two types of "non-deterministic" here: 1. Across-query non-deterministic but in-query deterministic, which means the same expression can produce different results over the same input between different runs, but should always give the same result within the same run. sum/avg on floating point numbers could be an example. Shall we make sure that "select sum(f) - sum(f) from t" always return 0? and similarly for "first()" maybe, should "select first_value(c) = first_value(c) over ..." always return true? It is important to define the behavior first, which will lead to opposite approaches on how to handle the "deterministic" field here. 2. Across-query and in-query non-deterministic, which I don't think is allowed anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19458: [SPARK-22227][CORE] DiskBlockManager.getAllBlocks...
Github user superbobry commented on a diff in the pull request: https://github.com/apache/spark/pull/19458#discussion_r144655974 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala --- @@ -100,7 +100,16 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** List all the blocks currently stored on disk by the disk manager. */ def getAllBlocks(): Seq[BlockId] = { -getAllFiles().map(f => BlockId(f.getName)) +getAllFiles().flatMap { f => + val blockId = BlockId.guess(f.getName) --- End diff -- Will do. Should I log the exception even if the file has been produced by `Utils.tempFileWith`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19452 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82743/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19452 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19452: [SPARK-22136][SS] Evaluate one-sided conditions early in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19452 **[Test build #82743 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82743/testReport)** for PR 19452 at commit [`94dfa85`](https://github.com/apache/spark/commit/94dfa85242b350df2f630707fd27181e91fdf7ce). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19488: [SPARK-22266][SQL] The same aggregate function wa...
Github user rednaxelafx commented on a diff in the pull request: https://github.com/apache/spark/pull/19488#discussion_r144651235 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -205,14 +205,17 @@ object PhysicalAggregation { case logical.Aggregate(groupingExpressions, resultExpressions, child) => // A single aggregate expression might appear multiple times in resultExpressions. // In order to avoid evaluating an individual aggregate function multiple times, we'll - // build a set of the distinct aggregate expressions and build a function which can + // build a map of the distinct aggregate expressions and build a function which can // be used to re-write expressions so that they reference the single copy of the - // aggregate function which actually gets computed. - val aggregateExpressions = resultExpressions.flatMap { expr => + // aggregate function which actually gets computed. Note that aggregate expressions + // should always be deterministic, so we can use its canonicalized expression as its --- End diff -- @cloud-fan Agreed. e.g. `first()` in Spark SQL is marked as nondeterministic right now (although for the case of `first()` I'd actually believe we should make it deterministic instead, but that's for another story) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r144651101 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates +import org.apache.spark.sql.types.DataTypes + +class StreamingSymmetricHashJoinHelperSuite extends StreamTest { + import org.apache.spark.sql.functions._ + + val attributeA = AttributeReference("a", DataTypes.IntegerType)() + val attributeB = AttributeReference("b", DataTypes.IntegerType)() + val attributeC = AttributeReference("c", DataTypes.IntegerType)() + val attributeD = AttributeReference("d", DataTypes.IntegerType)() + val colA = new Column(attributeA) + val colB = new Column(attributeB) + val colC = new Column(attributeC) + val colD = new Column(attributeD) + + val left = new LocalTableScanExec(Seq(attributeA, attributeB), Seq()) + val right = new LocalTableScanExec(Seq(attributeC, attributeD), Seq()) + + test("empty") { +val split = JoinConditionSplitPredicates(None, left, right) +assert(split.leftSideOnly.isEmpty) +assert(split.rightSideOnly.isEmpty) +assert(split.bothSides.isEmpty) +assert(split.full.isEmpty) + } + + test("only literals") { +// Literal-only conjuncts end up on the left side because that's the first bucket they fit in. +// There's no semantic reason they couldn't be in any bucket. +val predicate = (lit(1) < lit(5) && lit(6) < lit(7) && lit(0) === lit(-1)).expr +val split = JoinConditionSplitPredicates(Some(predicate), left, right) + +assert(split.leftSideOnly.contains(predicate)) +assert(split.rightSideOnly.isEmpty) +assert(split.bothSides.isEmpty) +assert(split.full.contains(predicate)) + } + + test("only left") { +val predicate = (colA > lit(1) && colB > lit(5) && colA < colB).expr +val split = JoinConditionSplitPredicates(Some(predicate), left, right) + +assert(split.leftSideOnly.contains(predicate)) +assert(split.rightSideOnly.isEmpty) +assert(split.bothSides.isEmpty) +assert(split.full.contains(predicate)) + } + + test("only right") { +val predicate = (colC > lit(1) && colD > lit(5) && colD < colC).expr +val split = JoinConditionSplitPredicates(Some(predicate), left, right) + +assert(split.leftSideOnly.isEmpty) +assert(split.rightSideOnly.contains(predicate)) +assert(split.bothSides.isEmpty) +assert(split.full.contains(predicate)) + } + + test("mixed conjuncts") { +val predicate = (colA > colB && colC > colD && colA === colC && lit(1) === lit(1)).expr +val split = JoinConditionSplitPredicates(Some(predicate), left, right) + +assert(split.leftSideOnly.contains((colA > colB && lit(1) === lit(1)).expr)) +assert(split.rightSideOnly.contains((colC > colD).expr)) +assert(split.bothSides.contains((colA === colC).expr)) +assert(split.full.contains(predicate)) + } --- End diff -- shouldnt we also test right only for left outer joins, and vice versa? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19235: [SPARK-14387][SPARK-19459][SQL] Enable Hive-1.x O...
Github user dongjoon-hyun closed the pull request at: https://github.com/apache/spark/pull/19235 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19419: [SPARK-22188] [CORE] Adding security headers for prevent...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19419 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82742/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19419: [SPARK-22188] [CORE] Adding security headers for prevent...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19419 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #7842: [SPARK-8542][MLlib]PMML export for Decision Trees
Github user jomach commented on a diff in the pull request: https://github.com/apache/spark/pull/7842#discussion_r144641913 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLTreeModelUtils.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.pmml.export + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +import org.dmg.pmml.{Node => PMMLNode, Value => PMMLValue, _} + --- End diff -- remove blank Line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #7842: [SPARK-8542][MLlib]PMML export for Decision Trees
Github user jomach commented on a diff in the pull request: https://github.com/apache/spark/pull/7842#discussion_r144642103 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLTreeModelUtils.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.pmml.export + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +import org.dmg.pmml.{Node => PMMLNode, Value => PMMLValue, _} + +import org.apache.spark.mllib.tree.configuration.{Algo, FeatureType} +import org.apache.spark.mllib.tree.configuration.Algo._ +import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node} + +private[mllib] object PMMLTreeModelUtils { + + val FieldNamePrefix = "field_" + + def toPMMLTree(dtModel: DecisionTreeModel, modelName: String): (TreeModel, List[DataField]) = { + +val miningFunctionType = dtModel.algo match { + case Algo.Classification => MiningFunctionType.CLASSIFICATION + case Algo.Regression => MiningFunctionType.REGRESSION +} + +val treeModel = new TreeModel() + .setModelName(modelName) + .setFunctionName(miningFunctionType) + .setSplitCharacteristic(TreeModel.SplitCharacteristic.BINARY_SPLIT) + +var (rootNode, miningFields, dataFields, classes) = buildStub(dtModel.topNode, dtModel.algo) + +// adding predicted classes for classification and target field for regression for completeness +dtModel.algo match { + + case Algo.Classification => +miningFields = miningFields :+ new MiningField() + .setName(FieldName.create("class")) + .setUsageType(FieldUsageType.PREDICTED) + +val dataField = new DataField() + .setName(FieldName.create("class")) + .setOpType(OpType.CATEGORICAL) + .addValues(classes: _*) + .setDataType(DataType.DOUBLE) + +dataFields = dataFields :+ dataField + + case Algo.Regression => +val targetField = FieldName.create("target") +val dataField = new DataField(targetField, OpType.CONTINUOUS, DataType.DOUBLE) +dataFields = dataFields :+ dataField + +miningFields = miningFields :+ new MiningField() + .setName(targetField) + .setUsageType(FieldUsageType.TARGET) + +} + +val miningSchema = new MiningSchema().addMiningFields(miningFields: _*) + +treeModel.setNode(rootNode).setMiningSchema(miningSchema) + +(treeModel, dataFields) + } + + /** Build a pmml tree stub given the root mllib node. */ + private def buildStub(rootDTNode: Node, algo: Algo): +(PMMLNode, List[MiningField], List[DataField], List[PMMLValue]) = { + +val miningFields = mutable.MutableList[MiningField]() +val dataFields = mutable.HashMap[String, DataField]() +val classes = mutable.MutableList[Double]() + +def buildStubInternal(rootNode: Node, predicate: Predicate): PMMLNode = { + + // get rootPMML node for the MLLib node + val rootPMMLNode = new PMMLNode() +.setId(rootNode.id.toString) +.setScore(rootNode.predict.predict.toString) +.setPredicate(predicate) + + var leftPredicate: Predicate = new True() + var rightPredicate: Predicate = new True() + + if (rootNode.split.isDefined) { +val fieldName = FieldName.create(FieldNamePrefix + rootNode.split.get.feature) +val dataField = getDataField(rootNode, fieldName).get + +if (dataFields.get(dataField.getName.getValue).isEmpty) { + dataFields.put(dataField.getName.getValue, dataField) + miningFields += new MiningField() +.setName(dataField.getName) +.setUsageType(FieldUsageType.ACTIVE) + +} else if (dataField.getOpType != OpType.CONTINUOUS) { +
[GitHub] spark pull request #7842: [SPARK-8542][MLlib]PMML export for Decision Trees
Github user jomach commented on a diff in the pull request: https://github.com/apache/spark/pull/7842#discussion_r144642031 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLTreeModelUtils.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.pmml.export + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +import org.dmg.pmml.{Node => PMMLNode, Value => PMMLValue, _} + +import org.apache.spark.mllib.tree.configuration.{Algo, FeatureType} +import org.apache.spark.mllib.tree.configuration.Algo._ +import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node} + +private[mllib] object PMMLTreeModelUtils { + + val FieldNamePrefix = "field_" + + def toPMMLTree(dtModel: DecisionTreeModel, modelName: String): (TreeModel, List[DataField]) = { + +val miningFunctionType = dtModel.algo match { + case Algo.Classification => MiningFunctionType.CLASSIFICATION + case Algo.Regression => MiningFunctionType.REGRESSION +} + +val treeModel = new TreeModel() + .setModelName(modelName) + .setFunctionName(miningFunctionType) + .setSplitCharacteristic(TreeModel.SplitCharacteristic.BINARY_SPLIT) + +var (rootNode, miningFields, dataFields, classes) = buildStub(dtModel.topNode, dtModel.algo) + +// adding predicted classes for classification and target field for regression for completeness +dtModel.algo match { + + case Algo.Classification => +miningFields = miningFields :+ new MiningField() + .setName(FieldName.create("class")) + .setUsageType(FieldUsageType.PREDICTED) + +val dataField = new DataField() + .setName(FieldName.create("class")) + .setOpType(OpType.CATEGORICAL) + .addValues(classes: _*) + .setDataType(DataType.DOUBLE) + +dataFields = dataFields :+ dataField + + case Algo.Regression => +val targetField = FieldName.create("target") +val dataField = new DataField(targetField, OpType.CONTINUOUS, DataType.DOUBLE) +dataFields = dataFields :+ dataField + +miningFields = miningFields :+ new MiningField() + .setName(targetField) + .setUsageType(FieldUsageType.TARGET) + +} + +val miningSchema = new MiningSchema().addMiningFields(miningFields: _*) + +treeModel.setNode(rootNode).setMiningSchema(miningSchema) + +(treeModel, dataFields) + } + + /** Build a pmml tree stub given the root mllib node. */ + private def buildStub(rootDTNode: Node, algo: Algo): +(PMMLNode, List[MiningField], List[DataField], List[PMMLValue]) = { + +val miningFields = mutable.MutableList[MiningField]() +val dataFields = mutable.HashMap[String, DataField]() +val classes = mutable.MutableList[Double]() + +def buildStubInternal(rootNode: Node, predicate: Predicate): PMMLNode = { + + // get rootPMML node for the MLLib node + val rootPMMLNode = new PMMLNode() +.setId(rootNode.id.toString) +.setScore(rootNode.predict.predict.toString) +.setPredicate(predicate) + + var leftPredicate: Predicate = new True() + var rightPredicate: Predicate = new True() + + if (rootNode.split.isDefined) { +val fieldName = FieldName.create(FieldNamePrefix + rootNode.split.get.feature) +val dataField = getDataField(rootNode, fieldName).get + +if (dataFields.get(dataField.getName.getValue).isEmpty) { + dataFields.put(dataField.getName.getValue, dataField) + miningFields += new MiningField() +.setName(dataField.getName) +.setUsageType(FieldUsageType.ACTIVE) + +} else if (dataField.getOpType != OpType.CONTINUOUS) { +
[GitHub] spark pull request #7842: [SPARK-8542][MLlib]PMML export for Decision Trees
Github user jomach commented on a diff in the pull request: https://github.com/apache/spark/pull/7842#discussion_r144642055 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLTreeModelUtils.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.pmml.export + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +import org.dmg.pmml.{Node => PMMLNode, Value => PMMLValue, _} + +import org.apache.spark.mllib.tree.configuration.{Algo, FeatureType} +import org.apache.spark.mllib.tree.configuration.Algo._ +import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node} + +private[mllib] object PMMLTreeModelUtils { + + val FieldNamePrefix = "field_" + + def toPMMLTree(dtModel: DecisionTreeModel, modelName: String): (TreeModel, List[DataField]) = { + +val miningFunctionType = dtModel.algo match { + case Algo.Classification => MiningFunctionType.CLASSIFICATION + case Algo.Regression => MiningFunctionType.REGRESSION +} + +val treeModel = new TreeModel() + .setModelName(modelName) + .setFunctionName(miningFunctionType) + .setSplitCharacteristic(TreeModel.SplitCharacteristic.BINARY_SPLIT) + +var (rootNode, miningFields, dataFields, classes) = buildStub(dtModel.topNode, dtModel.algo) + +// adding predicted classes for classification and target field for regression for completeness +dtModel.algo match { + + case Algo.Classification => +miningFields = miningFields :+ new MiningField() + .setName(FieldName.create("class")) + .setUsageType(FieldUsageType.PREDICTED) + +val dataField = new DataField() + .setName(FieldName.create("class")) + .setOpType(OpType.CATEGORICAL) + .addValues(classes: _*) + .setDataType(DataType.DOUBLE) + +dataFields = dataFields :+ dataField + + case Algo.Regression => +val targetField = FieldName.create("target") +val dataField = new DataField(targetField, OpType.CONTINUOUS, DataType.DOUBLE) +dataFields = dataFields :+ dataField + +miningFields = miningFields :+ new MiningField() + .setName(targetField) + .setUsageType(FieldUsageType.TARGET) + +} + +val miningSchema = new MiningSchema().addMiningFields(miningFields: _*) + +treeModel.setNode(rootNode).setMiningSchema(miningSchema) + +(treeModel, dataFields) + } + + /** Build a pmml tree stub given the root mllib node. */ + private def buildStub(rootDTNode: Node, algo: Algo): +(PMMLNode, List[MiningField], List[DataField], List[PMMLValue]) = { + +val miningFields = mutable.MutableList[MiningField]() +val dataFields = mutable.HashMap[String, DataField]() +val classes = mutable.MutableList[Double]() + +def buildStubInternal(rootNode: Node, predicate: Predicate): PMMLNode = { + + // get rootPMML node for the MLLib node + val rootPMMLNode = new PMMLNode() +.setId(rootNode.id.toString) +.setScore(rootNode.predict.predict.toString) +.setPredicate(predicate) + + var leftPredicate: Predicate = new True() + var rightPredicate: Predicate = new True() + + if (rootNode.split.isDefined) { +val fieldName = FieldName.create(FieldNamePrefix + rootNode.split.get.feature) +val dataField = getDataField(rootNode, fieldName).get + +if (dataFields.get(dataField.getName.getValue).isEmpty) { + dataFields.put(dataField.getName.getValue, dataField) + miningFields += new MiningField() +.setName(dataField.getName) +.setUsageType(FieldUsageType.ACTIVE) + +} else if (dataField.getOpType != OpType.CONTINUOUS) { +
[GitHub] spark issue #19419: [SPARK-22188] [CORE] Adding security headers for prevent...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19419 **[Test build #82742 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82742/testReport)** for PR 19419 at commit [`5c76b91`](https://github.com/apache/spark/commit/5c76b914ecbd7fd82276496151f7ed89fe519025). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19451: SPARK-22181 Adds ReplaceExceptWithNotFilter rule
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19451 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19451: SPARK-22181 Adds ReplaceExceptWithNotFilter rule
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19451 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82740/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19451: SPARK-22181 Adds ReplaceExceptWithNotFilter rule
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19451 **[Test build #82740 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82740/testReport)** for PR 19451 at commit [`5facb93`](https://github.com/apache/spark/commit/5facb93d197cd3aeddb76813381a4499c8545a99). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18747: [SPARK-20822][SQL] Generate code to directly get value f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18747 **[Test build #82746 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82746/testReport)** for PR 18747 at commit [`750b230`](https://github.com/apache/spark/commit/750b2308080c7ad135f231770b2fcab71523cbf4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/19487 > If it does use it, it'll handle an invalid entry in setupJob/setupTask by throwing an exception there. This should currently happen and `hasValidPath` does not prevent it. That is, if committer is unable to handle specified output directory, it can throw exception in `committer.setupJob`; based on what is specified in the config passed in `TaskAttemptContext`. Note that `hasValidPath` and `path` handle the explicit case of absolute path based committer's where `HadoopMapReduceCommitProtocol` moves the result to the final destination (and removes them in case of failure) : see use of `commitJob#taskCommits`. `commitJob` does invoke `committer.commitJob` - so committer specific commit will happen. This is not relevant for non-path based committer's. What I would like clarification on is, what is to be done when `path` is invalid. My understanding was, this is up to the committer implementation to handle - since it could be a valid use : and if invalid, it would throw an exception in `setupJob` or `commitJob`. If this is incorrect assumption, then I will change it back to explicitly support `null` or `""` for `path` - instead of also unparseable path's --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19487: [SPARK-21549][CORE] Respect OutputFormats with no/invali...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/19487 The more I see of the committer internals, the less confident I am about understanding any of it. If your committer isn't writing stuff out, it doesn't need to have any value of mapred.output.dir at all, does it? If it does use it, it'll handle an invalid entry in setupJob/setupTask by throwing an exception there. So the goal of the stuff above it should be to make sure it gets to deal with validating its inputs. Hadoop trunk adds a new [PathOutputCommitter](https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java) class for committers: it's the useful getters of `FileOutputCommitter` pulled up so allowing other committers to provide things like spark the info they need without looking into properties like mapred.output.dir. Have a look at that class and if there is something extra you want pulled up, let me know before Hadoop 3.0 ships & I'll see what I can do --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19487: [SPARK-21549][CORE] Respect OutputFormats with no...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19487#discussion_r144633605 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -48,6 +49,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) @transient private var committer: OutputCommitter = _ /** + * Checks whether there are files to be committed to a valid output location. + * + * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, + * it is necessary to check whether a valid output path is specified. + * [[HadoopMapReduceCommitProtocol#path]] need not be a valid [[org.apache.hadoop.fs.Path]] for + * committers not writing to distributed file systems. + */ + private val hasValidPath = Try { new Path(path) }.isSuccess --- End diff -- That would depend on whether we want to support invalid paths or not (please see my comment below). If we are not supporting invalid paths, I will change this to `null != path && "" != path` explicitly - and have driver throw `IllegalArgumentException` as part of `commitJob` or `abortJob` as earlier. If we do want to support invalid paths, then exception is not irrelevant : since it indicates an explicit invalid path passed in to output committer (and output committer will suitably log in case parameter is invalid; it is not `HadoopMapReduceCommitProtocol`'s responsibility to do so). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18979 **[Test build #82745 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82745/testReport)** for PR 18979 at commit [`c0e81a1`](https://github.com/apache/spark/commit/c0e81a1c87011efdc010f1c9ba28dde003458667). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...
Github user steveloughran commented on the issue: https://github.com/apache/spark/pull/18979 done. Not writing 0-byte files will offer significant speedup against object stores, where the cost of a call to getFileStatus() can take hundreds of millis. I look forward to it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19485: [SPARK-20055] [Docs] Added documentation for loading csv...
Github user jomach commented on the issue: https://github.com/apache/spark/pull/19485 @HyukjinKwon I came up with this. What do you think ? What I don't like on it is that I did not find anyway to read Javadocs into the markdown so that we don't have duplicates. Any ideia or should we leave it as in this PR ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org