[GitHub] spark issue #17359: [SPARK-20028][SQL] Add aggreagate expression nGrams
Github user viirya commented on the issue: https://github.com/apache/spark/pull/17359 @gatorsmile I will try to take a look again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19404 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19404 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82433/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18861: [SPARK-19426][SQL] Custom coalescer for Dataset
Github user maropu closed the pull request at: https://github.com/apache/spark/pull/18861 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18861: [SPARK-19426][SQL] Custom coalescer for Dataset
Github user maropu commented on the issue: https://github.com/apache/spark/pull/18861 ok, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19404 **[Test build #82433 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82433/testReport)** for PR 19404 at commit [`9cd3ee6`](https://github.com/apache/spark/commit/9cd3ee69b9ddd678b77d8b78feec51ea8c55e377). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19083 **[Test build #82435 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82435/testReport)** for PR 19083 at commit [`dfde49b`](https://github.com/apache/spark/commit/dfde49bcc487ecbc0135cd301e8d9c3ad17921be). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142556519 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { --- End diff -- Add a config constant now you're using this in more places? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19061: [SPARK-21568][CORE] ConsoleProgressBar should onl...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19061#discussion_r142556541 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -598,6 +598,15 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. +if (isShell(args.primaryResource)) { + if (!sparkConf.contains("spark.ui.showConsoleProgress")) { +sysProps("spark.ui.showConsoleProgress") = "true" + } +} else { + sysProps("spark.ui.showConsoleProgress") = "false" --- End diff -- This is not needed right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18861: [SPARK-19426][SQL] Custom coalescer for Dataset
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18861 Yeah, maybe close it first. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142554065 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + --- End diff -- Could you split the whole test case to multiple independent smaller unit test cases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfigurati...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19413 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19413 (FYI, didn't merge to 2.0.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553845 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.orc.compression.codec' taking effect on hive table writing") { + +val hadoopConf = spark.sessionState.newHadoopConf() + +val partitionStr = "p=1" + +case class TableCompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[TableCompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${if (isPartitioned) "PARTITIONED BY (p int)" else ""} + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else ""} +""".stripMargin) + } + + def insertOverwriteTable(): Unit = { +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${if (isPartitioned) s"partition ($partitionStr)" else ""} + |SELECT * from table_source +""".stripMargin) + } +} + +def getTableCompressionCodec(path: String, format: String): String = { + val codecs = format match { +case "parquet" => for { + footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) + block <- footer.getParquetMetadata.getBlocks.asScala + column <- block.getColumns.asScala +} yield column.getCodec.name() +case "orc" => new File(path).listFiles().filter{ file => + file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS" +}.map { orcFile => + OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString +}.toSeq + } + + assert(codecs.distinct.length == 1) + codecs.head +} + +def checkCompressionCodecForTable(format: String, isPartitioned: Boolean, + compressionConf: Option[TableCompressionConf])(assertion: String => Unit): Unit = { + val table = TableDefine(s"tbl_$format${isPartitioned}", +isPartitioned, format, compressionConf) + withTempDir { tmpDir => +withTable(table.tableName) { + table.createTable(tmpDir) + table.insertOverwriteTable() + val partition = if (table.isPartitioned) partitionStr else "" + val path = s"${tmpDir.getPath.stripSuffix("/")}/${table.tableName}/$partition" + assertion(getTableCompressionCodec(path, table.format)) +} + } +} + +def getConvertMetastoreConfName(format: String): String = format match { + case "parquet" => "spark.sql.hive.convertMetastoreParquet" + case "orc" => "spark.sql.hive.convertMetastoreOrc" +} + +def getSparkCompressionConfName(format: String): String = format match { + case "parquet" => "spark.sql.parquet.compression.codec" + case "orc" => "spark.sql.orc.compression.codec" +} + +def checkTableCompressionCodecForCodecs(format: String, isPartitioned: Boolean, + convertMetastore: Boolean, compressionCodecs: List[String], + tableCompressionConf: List[TableCompressionConf]) --- End diff -- Could you update the indents for all of them in this PR? See the link: https://github.com/databricks/scala-style-guide#indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19413 LGTM. Merging to master and back to 2.0 unless I hit a conflict. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553648 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.orc.compression.codec' taking effect on hive table writing") { + +val hadoopConf = spark.sessionState.newHadoopConf() + +val partitionStr = "p=1" + +case class TableCompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, --- End diff -- Use a function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553517 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.orc.compression.codec' taking effect on hive table writing") { + +val hadoopConf = spark.sessionState.newHadoopConf() + +val partitionStr = "p=1" + +case class TableCompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[TableCompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${if (isPartitioned) "PARTITIONED BY (p int)" else ""} --- End diff -- Please do not embed it. Just create a parameter above this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553387 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" --- End diff -- Why ORC and Parquet are different? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19422 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19422 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82432/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553277 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" + case _@x => x --- End diff -- `case x => x`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19422 **[Test build #82432 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82432/testReport)** for PR 19422 at commit [`b66f5bb`](https://github.com/apache/spark/commit/b66f5bb43622ee85bc0b534e0cfaa6d52179d0e7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553192 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { --- End diff -- ```Scala val compressionCodec = getCompressionByPriority( fileSinkConf, compressionConf = "parquet.compression", default = sparkSession.sessionState.conf.parquetCompressionCodec) match { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142552754 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => --- End diff -- Is it case sensitive? Should we convert it to lower case and upper case for string comparison? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142552658 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -86,6 +106,14 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { options = Map.empty) } + private def getCompressionByPriority(fileSinkConf: FileSinkDesc, +compressionConf: String, default: String): String = { --- End diff -- Could you add the description to explain the priority sequences? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142552413 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -86,6 +106,14 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { options = Map.empty) } + private def getCompressionByPriority(fileSinkConf: FileSinkDesc, +compressionConf: String, default: String): String = { --- End diff -- ```Scala private def getCompressionByPriority( fileSinkConf: FileSinkDesc, compressionConf: String, default: String): String = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142547222 --- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js --- @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ajaxStop($.unblockUI); +$(document).ajaxStart(function () { +$.blockUI({message: 'Loading Tasks Page...'}); +}); + +$.extend( $.fn.dataTable.ext.type.order, { +"file-size-pre": ConvertDurationString, + +"file-size-asc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? -1 : ((a > b) ? 1 : 0)); +}, + +"file-size-desc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? 1 : ((a > b) ? -1 : 0)); +} +} ); + +function createTemplateURI(appId) { +var words = document.baseURI.split('/'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + '/static/stagespage-template.html'; +return baseURI; +} +ind = words.indexOf("history"); +if(ind > 0) { +var baseURI = words.slice(0, ind).join('/') + '/static/stagespage-template.html'; +return baseURI; +} +return location.origin + "/static/stagespage-template.html"; +} + +// This function will only parse the URL under certain formate +// e.g. https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0 +function StageEndPoint(appId) { +var words = document.baseURI.split('/'); +var words2 = document.baseURI.split('?'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var appId = words[ind + 1]; +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +var newBaseURI = words.slice(0, ind + 2).join('/'); +return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + stageId; +} +ind = words.indexOf("history"); +if (ind > 0) { +var appId = words[ind + 1]; +var attemptId = words[ind + 2]; +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +var newBaseURI = words.slice(0, ind).join('/'); +if (isNaN(attemptId) || attemptId == "0") { +return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + stageId; +} else { +return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/stages/" + stageId; +} +} +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +return location.origin + "/api/v1/applications/" + appId + "/stages/" + stageId; +} + +function sortNumber(a,b) { +return a - b; +} + +function quantile(array, percentile) { +index = percentile/100. * (array.length-1); +if (Math.floor(index) == index) { + result = array[index]; +} else { +var i = Math.floor(index); +fraction = index - i; +result = array[i]; +} +return result; +} + +$(document).ready(function () { +$.extend($.fn.dataTable.defaults, { --- End diff -- this might also be worth moving to `utils.js` since we set the same defaults on all our pages --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142546728 --- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js --- @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ajaxStop($.unblockUI); +$(document).ajaxStart(function () { +$.blockUI({message: 'Loading Tasks Page...'}); +}); + +$.extend( $.fn.dataTable.ext.type.order, { +"file-size-pre": ConvertDurationString, + +"file-size-asc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? -1 : ((a > b) ? 1 : 0)); +}, + +"file-size-desc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? 1 : ((a > b) ? -1 : 0)); +} +} ); + +function createTemplateURI(appId) { +var words = document.baseURI.split('/'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + '/static/stagespage-template.html'; +return baseURI; +} +ind = words.indexOf("history"); +if(ind > 0) { +var baseURI = words.slice(0, ind).join('/') + '/static/stagespage-template.html'; +return baseURI; +} +return location.origin + "/static/stagespage-template.html"; +} + +// This function will only parse the URL under certain formate +// e.g. https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0 +function StageEndPoint(appId) { +var words = document.baseURI.split('/'); +var words2 = document.baseURI.split('?'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var appId = words[ind + 1]; +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +var newBaseURI = words.slice(0, ind + 2).join('/'); +return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + stageId; +} +ind = words.indexOf("history"); +if (ind > 0) { +var appId = words[ind + 1]; +var attemptId = words[ind + 2]; +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +var newBaseURI = words.slice(0, ind).join('/'); +if (isNaN(attemptId) || attemptId == "0") { +return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + stageId; +} else { +return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/stages/" + stageId; +} +} +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); --- End diff -- iiuc this doesn't account for anchors in the url (`#tasks-section`), I'd look into some url parsing js functions libs, I've seen some good ones before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142539880 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala --- @@ -138,21 +155,61 @@ private[v1] object AllStagesResource { } } - def convertTaskData(uiData: TaskUIData, lastUpdateTime: Option[Long]): TaskData = { + private def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = { --- End diff -- `currentTime` is never used --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142546939 --- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js --- @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ajaxStop($.unblockUI); +$(document).ajaxStart(function () { +$.blockUI({message: 'Loading Tasks Page...'}); +}); + +$.extend( $.fn.dataTable.ext.type.order, { +"file-size-pre": ConvertDurationString, + +"file-size-asc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? -1 : ((a > b) ? 1 : 0)); +}, + +"file-size-desc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? 1 : ((a > b) ? -1 : 0)); +} +} ); + +function createTemplateURI(appId) { +var words = document.baseURI.split('/'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + '/static/stagespage-template.html'; +return baseURI; +} +ind = words.indexOf("history"); +if(ind > 0) { +var baseURI = words.slice(0, ind).join('/') + '/static/stagespage-template.html'; +return baseURI; +} +return location.origin + "/static/stagespage-template.html"; +} + +// This function will only parse the URL under certain formate +// e.g. https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0 +function StageEndPoint(appId) { +var words = document.baseURI.split('/'); +var words2 = document.baseURI.split('?'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var appId = words[ind + 1]; +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +var newBaseURI = words.slice(0, ind + 2).join('/'); +return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + stageId; +} +ind = words.indexOf("history"); +if (ind > 0) { +var appId = words[ind + 1]; +var attemptId = words[ind + 2]; +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +var newBaseURI = words.slice(0, ind).join('/'); +if (isNaN(attemptId) || attemptId == "0") { +return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + stageId; +} else { +return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/stages/" + stageId; +} +} +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); --- End diff -- Also I'm not quite sure what the `3`s are doing here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142545431 --- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js --- @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ajaxStop($.unblockUI); +$(document).ajaxStart(function () { +$.blockUI({message: 'Loading Tasks Page...'}); +}); + +$.extend( $.fn.dataTable.ext.type.order, { +"file-size-pre": ConvertDurationString, + +"file-size-asc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? -1 : ((a > b) ? 1 : 0)); +}, + +"file-size-desc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? 1 : ((a > b) ? -1 : 0)); +} +} ); + +function createTemplateURI(appId) { +var words = document.baseURI.split('/'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + '/static/stagespage-template.html'; +return baseURI; +} +ind = words.indexOf("history"); +if(ind > 0) { +var baseURI = words.slice(0, ind).join('/') + '/static/stagespage-template.html'; +return baseURI; +} +return location.origin + "/static/stagespage-template.html"; +} + +// This function will only parse the URL under certain formate +// e.g. https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0 +function StageEndPoint(appId) { --- End diff -- style: `stageEndPoint` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142547431 --- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js --- @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ajaxStop($.unblockUI); +$(document).ajaxStart(function () { +$.blockUI({message: 'Loading Tasks Page...'}); +}); + +$.extend( $.fn.dataTable.ext.type.order, { +"file-size-pre": ConvertDurationString, + +"file-size-asc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? -1 : ((a > b) ? 1 : 0)); +}, + +"file-size-desc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? 1 : ((a > b) ? -1 : 0)); +} +} ); + +function createTemplateURI(appId) { +var words = document.baseURI.split('/'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var baseURI = words.slice(0, ind + 1).join('/') + '/' + appId + '/static/stagespage-template.html'; +return baseURI; +} +ind = words.indexOf("history"); +if(ind > 0) { +var baseURI = words.slice(0, ind).join('/') + '/static/stagespage-template.html'; +return baseURI; +} +return location.origin + "/static/stagespage-template.html"; +} + +// This function will only parse the URL under certain formate +// e.g. https://axonitered-jt1.red.ygrid.yahoo.com:50509/history/application_1502220952225_59143/stages/stage/?id=0=0 +function StageEndPoint(appId) { +var words = document.baseURI.split('/'); +var words2 = document.baseURI.split('?'); +var ind = words.indexOf("proxy"); +if (ind > 0) { +var appId = words[ind + 1]; +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +var newBaseURI = words.slice(0, ind + 2).join('/'); +return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + stageId; +} +ind = words.indexOf("history"); +if (ind > 0) { +var appId = words[ind + 1]; +var attemptId = words[ind + 2]; +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +var newBaseURI = words.slice(0, ind).join('/'); +if (isNaN(attemptId) || attemptId == "0") { +return newBaseURI + "/api/v1/applications/" + appId + "/stages/" + stageId; +} else { +return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/stages/" + stageId; +} +} +var stageIdLen = words2[1].indexOf('&'); +var stageId = words2[1].substr(3, stageIdLen - 3); +return location.origin + "/api/v1/applications/" + appId + "/stages/" + stageId; +} + +function sortNumber(a,b) { +return a - b; +} + +function quantile(array, percentile) { +index = percentile/100. * (array.length-1); +if (Math.floor(index) == index) { + result = array[index]; +} else { +var i = Math.floor(index); +fraction = index - i; +result = array[i]; +} +return result; +} + +$(document).ready(function () { +$.extend($.fn.dataTable.defaults, { +stateSave: true, +lengthMenu: [[20, 40, 60, 100, -1], [20, 40, 60, 100, "All"]], +pageLength: 20 +}); + +$("#showAdditionalMetrics").append( --- End diff -- If this is just adding static html we can probably leave it in `StagePage.scala` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142543852 --- Diff: core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala --- @@ -346,7 +346,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B for { stageId <- 0 to 1 -attemptId <- 0 to 1 +attemptId <- 1 to 0 --- End diff -- Why is this changed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142540750 --- Diff: core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala --- @@ -67,14 +68,13 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar extends SparkListener { val executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]() var executorEvents = new ListBuffer[SparkListenerEvent]() + val executorIdToAddress = mutable.HashMap[String, String]() private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000) private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList - def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList - --- End diff -- Why remove these lines? They don't seem to be an issue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142544701 --- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js --- @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ajaxStop($.unblockUI); +$(document).ajaxStart(function () { +$.blockUI({message: 'Loading Tasks Page...'}); --- End diff -- This is more commonly referred to as the Stage Page than the Tasks Page --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142543153 --- Diff: core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala --- @@ -170,6 +170,17 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar execTaskSummary.isBlacklisted = isBlacklisted } + def getExecutorHost(eid: String): String = { --- End diff -- Is this how `getExecutorHost` works in other classes? How did you decide to implement it this way? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19270: [SPARK-21809] : Change Stage Page to use datatabl...
Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19270#discussion_r142545284 --- Diff: core/src/main/resources/org/apache/spark/ui/static/taskspages.js --- @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +$(document).ajaxStop($.unblockUI); +$(document).ajaxStart(function () { +$.blockUI({message: 'Loading Tasks Page...'}); +}); + +$.extend( $.fn.dataTable.ext.type.order, { +"file-size-pre": ConvertDurationString, + +"file-size-asc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? -1 : ((a > b) ? 1 : 0)); +}, + +"file-size-desc": function ( a, b ) { +a = ConvertDurationString( a ); +b = ConvertDurationString( b ); +return ((a < b) ? 1 : ((a > b) ? -1 : 0)); +} +} ); + +function createTemplateURI(appId) { --- End diff -- This think this function can be abstracted out (with the template file name added as a param) and moved to utils.js --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18732 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18732 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82431/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18732 **[Test build #82431 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82431/testReport)** for PR 18732 at commit [`1ea2b71`](https://github.com/apache/spark/commit/1ea2b71801784842d1797863456a07c7fcfc2531). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19404 **[Test build #82434 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82434/testReport)** for PR 19404 at commit [`f945f39`](https://github.com/apache/spark/commit/f945f39438c6a1cabff3f18489dd2082c4cba24c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18098: [SPARK-16944][Mesos] Improve data locality when launchin...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/18098 @ArtRand +1 for the pluggable interface... in general we are planning to optimize resource allocation and locality in another PR, (fenzo form Netflix is something we would like to test which is already used by others like Flink). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19413 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82429/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19413 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19413 **[Test build #82429 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82429/testReport)** for PR 19413 at commit [`079a4e2`](https://github.com/apache/spark/commit/079a4e26ec900745411fdd60e4cd242b9a43320b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix
Github user rekhajoshm commented on the issue: https://github.com/apache/spark/pull/19422 @srowen got it, just the pull you mean, didn't think of it then. thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19404 **[Test build #82433 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82433/testReport)** for PR 19404 at commit [`9cd3ee6`](https://github.com/apache/spark/commit/9cd3ee69b9ddd678b77d8b78feec51ea8c55e377). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r142536858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def output: Seq[Attribute] = left.output ++ right.output + override def output: Seq[Attribute] = joinType match { +case _: InnerLike => left.output ++ right.output +case LeftOuter => left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => left.output.map(_.withNullability(true)) ++ right.output +case _ => + throwBadJoinTypeException() + Seq() + } override def outputPartitioning: Partitioning = joinType match { case _: InnerLike => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) +case LeftOuter => PartitioningCollection(Seq(right.outputPartitioning)) --- End diff -- Resolved offline. LeftOuterJoin should produce left.outputPartitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19327 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19327 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82428/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r142533513 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -425,6 +426,10 @@ class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with Befo // Test static comparisons assert(watermarkFrom("cast(leftTime AS LONG) > 10") === Some(1)) + +// Test non-positive results +assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 10") === Some(0)) +assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 100") === Some(-9)) --- End diff -- It does not build a physical plan, it simply uses the SparkSQL QueryExecution to pass the expression through the analyzer and optimizer, to get the conditions in a similar form as a real query would. Instead we can simply using SimpleAnalyzer and SimpleTestOptimizer ``` val analyzedPlan = SimpleAnalyzer.execute(plan) val optimizedPlan = SimpleTestOptimizer.execute(analyzedPlan) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [SPARK-22136][SS] Implement stream-stream outer joins.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19327 **[Test build #82428 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82428/testReport)** for PR 19327 at commit [`b8ba1e2`](https://github.com/apache/spark/commit/b8ba1e2c34748c47d843c147c595bf92df4c41c7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142533141 --- Diff: python/pyspark/sql/functions.py --- @@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, vectorized=False): self._name = name or ( func.__name__ if hasattr(func, '__name__') else func.__class__.__name__) -self._vectorized = vectorized +self.vectorized = vectorized --- End diff -- I kind of dislike the inconsistency between `UserDefinedFunction` and its wrapped function. I think they are just the same thing except for the wrapped function has doc string. For ease of mind, I think we should make them either both private or public. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19422 OK, don't bother with a JIRA for items like this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18098: [SPARK-16944][Mesos] Improve data locality when launchin...
Github user ArtRand commented on the issue: https://github.com/apache/spark/pull/18098 Hello @gpang, after thinking about this a lot I'm glad that you ended up merging this. However, I think it’s worth considering the implications of changing the offer evaluation logic in the driver. My main concern with the method you’ve proposed is in a cluster with many frameworks and concurrent Spark jobs (potentially with heterogeneous locality wait times) this solution may not be effective. Further I believe that your algorithm doesn’t account for adding an executor on an agent that already contains an executor (https://github.com/apache/spark/pull/18098/files#diff-387c5d0c916278495fc28420571adf9eR534), which may be what you want in some situations (because there is no other way to increase the cores for an executor already placed on an agent). I realize this is an edge case, and that that the old behavior (essentially random placement) wouldn’t afford better performance. With this in mind I’d like to propose that we make offer evaluation in Spark on Mesos a pluggable interface. For example, right now there is no way to easily spread executors over agents, pack executors on agents, or other context-specific behaviors that a user may want. I think one of Spark’s strengths is its tunability, and we should expose this to users who wish to use it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r142530623 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -470,3 +475,222 @@ class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with Befo } } } + +class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + import testImplicits._ + import org.apache.spark.sql.functions._ + + before { +SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' +spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { +StateStore.stop() + } + + private def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = { +val input = MemoryStream[Int] +val df = input.toDF + .select( +'value as "key", +'value.cast("timestamp") as s"${prefix}Time", +('value * multiplier) as s"${prefix}Value") + .withWatermark(s"${prefix}Time", "10 seconds") + +return (input, df) + } + + private def setupWindowedJoin(joinType: String): + (MemoryStream[Int], MemoryStream[Int], DataFrame) = { +val (input1, df1) = setupStream("left", 2) +val (input2, df2) = setupStream("right", 3) +val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue) +val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue) +val joined = windowed1.join(windowed2, Seq("key", "window"), joinType) + .select('key, $"window.end".cast("long"), 'leftValue, 'rightValue) + +(input1, input2, joined) + } + + test("windowed left outer join") { +val (leftInput, rightInput, joined) = setupWindowedJoin("left_outer") + +testStream(joined)( + // Test inner part of the join. + AddData(leftInput, 1, 2, 3, 4, 5), + AddData(rightInput, 3, 4, 5, 6, 7), + CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + // Old state doesn't get dropped until the batch *after* it gets introduced, so the + // nulls won't show up until the next batch after the watermark advances. + AddData(leftInput, 21), + AddData(rightInput, 22), + CheckLastBatch(), + assertNumStateRows(total = 12, updated = 2), + AddData(leftInput, 22), + CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 4, null)), + assertNumStateRows(total = 3, updated = 1) +) + } + + test("windowed right outer join") { +val (leftInput, rightInput, joined) = setupWindowedJoin("right_outer") + +testStream(joined)( + // Test inner part of the join. + AddData(leftInput, 1, 2, 3, 4, 5), + AddData(rightInput, 3, 4, 5, 6, 7), + CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + // Old state doesn't get dropped until the batch *after* it gets introduced, so the + // nulls won't show up until the next batch after the watermark advances. + AddData(leftInput, 21), + AddData(rightInput, 22), + CheckLastBatch(), + assertNumStateRows(total = 12, updated = 2), + AddData(leftInput, 22), + CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, null, 21)), + assertNumStateRows(total = 3, updated = 1) +) + } + + Seq( +("left_outer", Row(3, null, 5, null)), +("right_outer", Row(null, 2, null, 5)) + ).foreach { case (joinType: String, outerResult) => +test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range condition") { + import org.apache.spark.sql.functions._ + + val leftInput = MemoryStream[(Int, Int)] + val rightInput = MemoryStream[(Int, Int)] + + val df1 = leftInput.toDF.toDF("leftKey", "time") +.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue") +.withWatermark("leftTime", "10 seconds") + + val df2 = rightInput.toDF.toDF("rightKey", "time") +.select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue") +.withWatermark("rightTime", "10 seconds") + + val joined = +df1.join( + df2, + expr("leftKey = rightKey AND " + +"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime + interval 5 seconds"), + joinType) + .select('leftKey, 'rightKey, 'leftTime.cast("int"), 'rightTime.cast("int")) + testStream(joined)( +AddData(leftInput, (1, 5),
[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r142530435 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -470,3 +475,222 @@ class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with Befo } } } + +class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + import testImplicits._ + import org.apache.spark.sql.functions._ + + before { +SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' +spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { +StateStore.stop() + } + + private def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = { +val input = MemoryStream[Int] +val df = input.toDF + .select( +'value as "key", +'value.cast("timestamp") as s"${prefix}Time", +('value * multiplier) as s"${prefix}Value") + .withWatermark(s"${prefix}Time", "10 seconds") + +return (input, df) + } + + private def setupWindowedJoin(joinType: String): + (MemoryStream[Int], MemoryStream[Int], DataFrame) = { +val (input1, df1) = setupStream("left", 2) +val (input2, df2) = setupStream("right", 3) +val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue) +val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue) +val joined = windowed1.join(windowed2, Seq("key", "window"), joinType) + .select('key, $"window.end".cast("long"), 'leftValue, 'rightValue) + +(input1, input2, joined) + } + + test("windowed left outer join") { +val (leftInput, rightInput, joined) = setupWindowedJoin("left_outer") + +testStream(joined)( + // Test inner part of the join. + AddData(leftInput, 1, 2, 3, 4, 5), + AddData(rightInput, 3, 4, 5, 6, 7), + CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + // Old state doesn't get dropped until the batch *after* it gets introduced, so the + // nulls won't show up until the next batch after the watermark advances. + AddData(leftInput, 21), + AddData(rightInput, 22), + CheckLastBatch(), + assertNumStateRows(total = 12, updated = 2), + AddData(leftInput, 22), + CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 4, null)), + assertNumStateRows(total = 3, updated = 1) +) + } + + test("windowed right outer join") { +val (leftInput, rightInput, joined) = setupWindowedJoin("right_outer") + +testStream(joined)( + // Test inner part of the join. + AddData(leftInput, 1, 2, 3, 4, 5), + AddData(rightInput, 3, 4, 5, 6, 7), + CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + // Old state doesn't get dropped until the batch *after* it gets introduced, so the + // nulls won't show up until the next batch after the watermark advances. + AddData(leftInput, 21), + AddData(rightInput, 22), + CheckLastBatch(), + assertNumStateRows(total = 12, updated = 2), + AddData(leftInput, 22), + CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, null, 21)), + assertNumStateRows(total = 3, updated = 1) +) + } + + Seq( +("left_outer", Row(3, null, 5, null)), +("right_outer", Row(null, 2, null, 5)) + ).foreach { case (joinType: String, outerResult) => +test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range condition") { + import org.apache.spark.sql.functions._ + + val leftInput = MemoryStream[(Int, Int)] + val rightInput = MemoryStream[(Int, Int)] + + val df1 = leftInput.toDF.toDF("leftKey", "time") +.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue") +.withWatermark("leftTime", "10 seconds") + + val df2 = rightInput.toDF.toDF("rightKey", "time") +.select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue") +.withWatermark("rightTime", "10 seconds") + + val joined = +df1.join( + df2, + expr("leftKey = rightKey AND " + +"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime + interval 5 seconds"), + joinType) + .select('leftKey, 'rightKey, 'leftTime.cast("int"), 'rightTime.cast("int")) + testStream(joined)( +AddData(leftInput, (1, 5),
[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r142530086 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -470,3 +475,222 @@ class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with Befo } } } + +class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + import testImplicits._ + import org.apache.spark.sql.functions._ + + before { +SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' +spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { +StateStore.stop() + } + + private def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = { +val input = MemoryStream[Int] +val df = input.toDF + .select( +'value as "key", +'value.cast("timestamp") as s"${prefix}Time", +('value * multiplier) as s"${prefix}Value") + .withWatermark(s"${prefix}Time", "10 seconds") + +return (input, df) + } + + private def setupWindowedJoin(joinType: String): + (MemoryStream[Int], MemoryStream[Int], DataFrame) = { +val (input1, df1) = setupStream("left", 2) +val (input2, df2) = setupStream("right", 3) +val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue) +val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue) +val joined = windowed1.join(windowed2, Seq("key", "window"), joinType) + .select('key, $"window.end".cast("long"), 'leftValue, 'rightValue) + +(input1, input2, joined) + } + + test("windowed left outer join") { +val (leftInput, rightInput, joined) = setupWindowedJoin("left_outer") + +testStream(joined)( + // Test inner part of the join. + AddData(leftInput, 1, 2, 3, 4, 5), + AddData(rightInput, 3, 4, 5, 6, 7), + CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + // Old state doesn't get dropped until the batch *after* it gets introduced, so the + // nulls won't show up until the next batch after the watermark advances. + AddData(leftInput, 21), + AddData(rightInput, 22), + CheckLastBatch(), + assertNumStateRows(total = 12, updated = 2), + AddData(leftInput, 22), + CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 4, null)), + assertNumStateRows(total = 3, updated = 1) +) + } + + test("windowed right outer join") { +val (leftInput, rightInput, joined) = setupWindowedJoin("right_outer") + +testStream(joined)( + // Test inner part of the join. + AddData(leftInput, 1, 2, 3, 4, 5), + AddData(rightInput, 3, 4, 5, 6, 7), + CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + // Old state doesn't get dropped until the batch *after* it gets introduced, so the + // nulls won't show up until the next batch after the watermark advances. + AddData(leftInput, 21), + AddData(rightInput, 22), + CheckLastBatch(), + assertNumStateRows(total = 12, updated = 2), + AddData(leftInput, 22), + CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, null, 21)), + assertNumStateRows(total = 3, updated = 1) +) + } + + Seq( +("left_outer", Row(3, null, 5, null)), +("right_outer", Row(null, 2, null, 5)) + ).foreach { case (joinType: String, outerResult) => +test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range condition") { + import org.apache.spark.sql.functions._ + + val leftInput = MemoryStream[(Int, Int)] + val rightInput = MemoryStream[(Int, Int)] + + val df1 = leftInput.toDF.toDF("leftKey", "time") +.select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue") +.withWatermark("leftTime", "10 seconds") + + val df2 = rightInput.toDF.toDF("rightKey", "time") +.select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue") +.withWatermark("rightTime", "10 seconds") + + val joined = +df1.join( + df2, + expr("leftKey = rightKey AND " + +"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime + interval 5 seconds"), + joinType) + .select('leftKey, 'rightKey, 'leftTime.cast("int"), 'rightTime.cast("int")) + testStream(joined)( +AddData(leftInput, (1, 5),
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142529852 --- Diff: python/pyspark/sql/functions.py --- @@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, vectorized=False): self._name = name or ( func.__name__ if hasattr(func, '__name__') else func.__class__.__name__) -self._vectorized = vectorized +self.vectorized = vectorized --- End diff -- Oh sorry, I should have been more clear. This should stay `self._vectorized` since it is a private variable to the class, it's only `wrapped.vectorized` (which you already changed below), isn't being used as private so shouldn't have an underscore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142527524 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1020,10 +1006,14 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin } object CodeGenerator extends Logging { + + // This is the value of HugeMethodLimit in the OpenJDK JVM settings + val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + /** * Compile the Java source code into a Java class, using Janino. */ - def compile(code: CodeAndComment): GeneratedClass = try { + def compile(code: CodeAndComment): (GeneratedClass, Int) = try { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19422: [SPARK-22193][SQL] Minor typo fix
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19422 **[Test build #82432 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82432/testReport)** for PR 19422 at commit [`b66f5bb`](https://github.com/apache/spark/commit/b66f5bb43622ee85bc0b534e0cfaa6d52179d0e7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19422: [SPARK-22193][SQL] Minor typo fix
GitHub user rekhajoshm opened a pull request: https://github.com/apache/spark/pull/19422 [SPARK-22193][SQL] Minor typo fix ## What changes were proposed in this pull request? [SPARK-22193][SQL] Minor typo fix ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/spark SPARK-22193 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19422 commit e3677c9fa9697e0d34f9df52442085a6a481c9e9 Author: Rekha JoshiDate: 2015-05-05T23:10:08Z Merge pull request #1 from apache/master Pulling functionality from apache spark commit 106fd8eee8f6a6f7c67cfc64f57c1161f76d8f75 Author: Rekha Joshi Date: 2015-05-08T21:49:09Z Merge pull request #2 from apache/master pull latest from apache spark commit 0be142d6becba7c09c6eba0b8ea1efe83d649e8c Author: Rekha Joshi Date: 2015-06-22T00:08:08Z Merge pull request #3 from apache/master Pulling functionality from apache spark commit 6c6ee12fd733e3f9902e10faf92ccb78211245e3 Author: Rekha Joshi Date: 2015-09-17T01:03:09Z Merge pull request #4 from apache/master Pulling functionality from apache spark commit b123c601e459d1ad17511fd91dd304032154882a Author: Rekha Joshi Date: 2015-11-25T18:50:32Z Merge pull request #5 from apache/master pull request from apache/master commit c73c32aadd6066e631956923725a48d98a18777e Author: Rekha Joshi Date: 2016-03-18T19:13:51Z Merge pull request #6 from apache/master pull latest from apache spark commit 7dbf7320057978526635bed09dabc8cf8657a28a Author: Rekha Joshi Date: 2016-04-05T20:26:40Z Merge pull request #8 from apache/master pull latest from apache spark commit 5e9d71827f8e2e4d07027281b80e4e073e7fecd1 Author: Rekha Joshi Date: 2017-05-01T23:00:30Z Merge pull request #9 from apache/master Pull apache spark commit 63d99b3ce5f222d7126133170a373591f0ac67dd Author: Rekha Joshi Date: 2017-09-30T22:26:44Z Merge pull request #10 from apache/master pull latest apache spark commit b66f5bb43622ee85bc0b534e0cfaa6d52179d0e7 Author: rjoshi2 Date: 2017-10-03T21:14:29Z [SPARK-22193][SQL] Minor typo fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142524349 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +63,22 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch + +val batchIter = if (batchSize > 0) { + new BatchIterator(iter, batchSize) +} else { + Iterator(iter) +} --- End diff -- This could be done on one line, not sure if others agree with that style though --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142523671 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType */ @InterfaceStability.Stable class RelationalGroupedDataset protected[sql]( -df: DataFrame, +val df: DataFrame, --- End diff -- I don't think this needs to be a `val` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142523354 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- I think we need to think a little more about how do we handle different formats of arrow data. Currently, the input of arrow serializer is a list of (pd.Series, DataType), I feel it's cleaner that this class not deal with type coercion and just serialization. It could take a `pyarrow.Table` for instance and let caller construct the `pyarrow.Table`. Another thing to think about is whatever the data we are passing are not purely `pd.Series` and `pd.DataFrame`. What if, for instance, we want to serialize a (pd.Series, pd.DataFrame) tuple or a tuple of (scalar value, pd.DataFrame). Maybe somehow making the serializer composable is more flexiable. i.e. a class knows how to serialize `pd.Series`, a class knows how to serialize `pd.DataFrame` and if we want to serialize (pd.Series, pd.DataFrame) tuple we can compose them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142522405 --- Diff: python/pyspark/worker.py --- @@ -74,17 +75,37 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +import pyarrow as pa + +arrow_return_types = list(to_arrow_type(field.dataType) for field in return_type) --- End diff -- I prefer list comprehensions, but it's up to you --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142521502 --- Diff: python/pyspark/worker.py --- @@ -74,17 +75,37 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +import pyarrow as pa --- End diff -- import not used --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r14252 --- Diff: python/pyspark/worker.py --- @@ -32,8 +32,9 @@ from pyspark.serializers import write_with_length, write_int, read_long, \ write_long, read_int, SpecialLengths, PythonEvalType, UTF8Deserializer, PickleSerializer, \ BatchedSerializer, ArrowStreamPandasSerializer -from pyspark.sql.types import toArrowType +from pyspark.sql.types import to_arrow_type from pyspark import shuffle +from pyspark.sql.types import StructType --- End diff -- combine this with `to_arrow_type` import --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142520328 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +foo_udf = pandas_udf( +foo, +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v=df.v + 1) +return ret + +@pandas_udf(StructType([StructField('id', LongType()), StructField('v', DoubleType())])) +def foo(df): --- End diff -- isn't this a redeclaration of `foo`? maybe some of these could be lamdas? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142519825 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +foo_udf = pandas_udf( +foo, +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret --- End diff -- same simplification as above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142519186 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,133 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret --- End diff -- could you simplify to ``` return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142518730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,28 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private object BatchIterator { + class InnerIterator[T](iter: Iterator[T], batchSize: Int) extends Iterator[T] { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18732 **[Test build #82431 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82431/testReport)** for PR 18732 at commit [`1ea2b71`](https://github.com/apache/spark/commit/1ea2b71801784842d1797863456a07c7fcfc2531). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142517310 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- What do you all think about making the `ArrowPandasSerializer` also able to serialize pandas.DataFrames? Then it wouldn't require this extra wrapping and I think it could be useful for other things in the future as well. @HyukjinKwon @ueshin @viirya ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18460: [SPARK-21247][SQL] Type comparision should respect case-...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/18460 Hi, @gatorsmile . Could you review this `case-(in)sensitive type comparison` PR, too? Unfortunately, this PR doesn't get any feedback for recent three months. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19041 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19041 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82427/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19041 **[Test build #82427 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82427/testReport)** for PR 19041 at commit [`95d5eb5`](https://github.com/apache/spark/commit/95d5eb5c792f51903db139972129d24710e2738c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142514594 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,28 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private object BatchIterator { + class InnerIterator[T](iter: Iterator[T], batchSize: Int) extends Iterator[T] { --- End diff -- could you just make `InnerIterator` a nested class in `BatchIterator` instead of here in the companion object? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18732 **[Test build #82430 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82430/testReport)** for PR 18732 at commit [`d37a9e6`](https://github.com/apache/spark/commit/d37a9e6a19e3f2b5bef796ba20cdb5bc46817f62). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18732 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18732 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82430/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18732 **[Test build #82430 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82430/testReport)** for PR 18732 at commit [`d37a9e6`](https://github.com/apache/spark/commit/d37a9e6a19e3f2b5bef796ba20cdb5bc46817f62). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18704 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18704 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82426/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18704 **[Test build #82426 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82426/testReport)** for PR 18704 at commit [`c16230d`](https://github.com/apache/spark/commit/c16230d34472e0337b87ce858289fec9a1d88ab4). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class PassThroughSuite extends SparkFunSuite ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19083: [SPARK-21871][SQL] Check actual bytecode size when compi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19083 @maropu Thanks for working on it. LGTM except two minor comments. cc @rednaxelafx @kiszk @viirya @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142503474 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -32,7 +32,7 @@ import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler} import org.codehaus.janino.util.ClassFile -import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException} --- End diff -- revert it back? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19083: [SPARK-21871][SQL] Check actual bytecode size whe...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19083#discussion_r142503203 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1020,10 +1006,14 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin } object CodeGenerator extends Logging { + + // This is the value of HugeMethodLimit in the OpenJDK JVM settings + val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + /** * Compile the Java source code into a Java class, using Janino. */ - def compile(code: CodeAndComment): GeneratedClass = try { + def compile(code: CodeAndComment): (GeneratedClass, Int) = try { --- End diff -- Please add `@return` to explain what are returned. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142501134 --- Diff: python/pyspark/sql/functions.py --- @@ -2181,31 +2186,69 @@ def udf(f=None, returnType=StringType()): @since(2.3) def pandas_udf(f=None, returnType=StringType()): """ -Creates a :class:`Column` expression representing a user defined function (UDF) that accepts -`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. +Creates a :class:`Column` expression representing a vectorized user defined function (UDF). + +The user-defined function can define one of the following transformations: +1. One or more `pandas.Series` -> A `pandas.Series` + + This udf is used with `DataFrame.withColumn` and `DataFrame.select`. + The returnType should be a primitive data type, e.g., DoubleType() + + Example: + + >>> from pyspark.sql.types import IntegerType, StringType + >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) + >>> @pandas_udf(returnType=StringType()) + ... def to_upper(s): + ... return s.str.upper() + ... + >>> @pandas_udf(returnType="integer") + ... def add_one(x): + ... return x + 1 + ... + >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) + >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ + ... .show() # doctest: +SKIP + +--+--++ + |slen(name)|to_upper(name)|add_one(age)| + +--+--++ + | 8| JOHN DOE| 22| + +--+--++ + +2. A `pandas.DataFrame` -> A `pandas.DataFrame` + + This udf is used with `GroupedData.apply` + The returnType should be a StructType describing the schema of the returned + `pandas.DataFrame`. + + Example: + + >>> df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 4.0)], ("id", "v")) + >>> @pandas_udf(returnType=df.schema) + ... def normalize(df): + ... v = df.v + ... ret = df.assign(v=(v - v.mean()) / v.std()) --- End diff -- Yup, it was an overlook. You are correct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19405: [SPARK-22178] [SQL] Refresh Persistent Views by R...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19405 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19395: [SPARK-22171] [SQL] Describe Table Extended Failed when ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19395 cc @cloud-fan @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18851: [SPARK-21644][SQL] LocalLimit.maxRows is defined ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18851 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19393: [SPARK-21644][SQL] LocalLimit.maxRows is defined ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19393 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19405: [SPARK-22178] [SQL] Refresh Persistent Views by REFRESH ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19405 Thanks! Merged to master/2.2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19393: [SPARK-21644][SQL] LocalLimit.maxRows is defined incorre...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19393 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142500980 --- Diff: python/pyspark/sql/functions.py --- @@ -2129,8 +2130,12 @@ def _create_udf(f, returnType, vectorized): def _udf(f, returnType=StringType(), vectorized=vectorized): if vectorized: import inspect -if len(inspect.getargspec(f).args) == 0: -raise NotImplementedError("0-parameter pandas_udfs are not currently supported") +argspec = inspect.getargspec(f) +if len(argspec.args) == 0 and argspec.varargs is None: --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142499712 --- Diff: python/pyspark/sql/functions.py --- @@ -2129,8 +2130,12 @@ def _create_udf(f, returnType, vectorized): def _udf(f, returnType=StringType(), vectorized=vectorized): if vectorized: import inspect -if len(inspect.getargspec(f).args) == 0: -raise NotImplementedError("0-parameter pandas_udfs are not currently supported") +argspec = inspect.getargspec(f) +if len(argspec.args) == 0 and argspec.varargs is None: --- End diff -- I think `varargs` are fine. I will add the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfiguration thro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19413 **[Test build #82429 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82429/testReport)** for PR 19413 at commit [`079a4e2`](https://github.com/apache/spark/commit/079a4e26ec900745411fdd60e4cd242b9a43320b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19413: [SPARK-20466][CORE] HadoopRDD#addLocalConfigurati...
Github user sahilTakiar commented on a diff in the pull request: https://github.com/apache/spark/pull/19413#discussion_r142499531 --- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala --- @@ -157,20 +157,23 @@ class HadoopRDD[K, V]( if (conf.isInstanceOf[JobConf]) { logDebug("Re-using user-broadcasted JobConf") conf.asInstanceOf[JobConf] - } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { -logDebug("Re-using cached JobConf") -HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { -// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the -// local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). -// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. -// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456). -HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - logDebug("Creating new JobConf and caching it for later re-use") - val newJobConf = new JobConf(conf) - initLocalJobConfFuncOpt.foreach(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf +val newJobConf = HadoopRDD.getCachedMetadata(jobConfCacheKey) --- End diff -- Thanks for the tip. Updated the patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org