Re: problem with using mapPartitions
Thank you for your reply. But the typo is not reason for the problem. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/problem-with-using-mapPartitions-tp12514p12520.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Dataframe's .drop in PySpark doesn't accept Column
Name resolution is not as easy I think. Wenchen can maybe give you some advice on resolution about this one. On Sat, May 30, 2015 at 9:37 AM, Yijie Shen henry.yijies...@gmail.com wrote: I think just match the Column’s expr as UnresolvedAttribute and use UnresolvedAttribute’s name to match schema’s field name is enough. Seems no need to regard expr as a more general one. :) On May 30, 2015 at 11:14:05 PM, Girardot Olivier ( o.girar...@lateral-thoughts.com) wrote: Jira done : https://issues.apache.org/jira/browse/SPARK-7969 I've already started working on it but it's less trivial than it seems because I don't exactly now the inner workings of the catalog, and how to get the qualified name of a column to match it against the schema/catalog. Regards, Olivier. Le sam. 30 mai 2015 à 09:54, Reynold Xin r...@databricks.com a écrit : Yea would be great to support a Column. Can you create a JIRA, and possibly a pull request? On Fri, May 29, 2015 at 2:45 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Actually, the Scala API too is only based on column name Le ven. 29 mai 2015 à 11:23, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi, Testing a bit more 1.4, it seems that the .drop() method in PySpark doesn't seem to accept a Column as input datatype : *.join(only_the_best, only_the_best.pol_no == df.pol_no, inner).drop(only_the_best.pol_no)\* File /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line 1225, in drop jdf = self._jdf.drop(colName) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 523, in __call__ (new_args, temp_args) = self._get_args(args) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 510, in _get_args temp_arg = converter.convert(arg, self.gateway_client) File /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line 490, in convert for key in object.keys(): TypeError: 'Column' object is not callable It doesn't seem very consistent with rest of the APIs - and is especially annoying when executing joins - because drop(my_key) is not a qualified reference to the column. What do you think about changing that ? or what is the best practice as a workaround ? Regards, Olivier.
Re: Using UDFs in Java without registration
We added all the typetags for arguments but haven't got around to use them yet. I think it'd make sense to have them and do the auto cast, but we can have rules in analysis to forbid certain casts (e.g. don't auto cast double to int). On Sat, May 30, 2015 at 7:12 AM, Justin Uang justin.u...@gmail.com wrote: The idea of asking for both the argument and return class is interesting. I don't think we do that for the scala APIs currently, right? In functions.scala, we only use the TypeTag for RT. def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = { UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } There would only be a small subset of conversions that would make sense implicitly (e.g. int to double, the typical conversions in programming languages), but something like (double = int) might be dangerous and (timestamp = double) wouldn't really make sense. Perhaps it's better to be explicit about casts? If we don't care about declaring the types of the arguments, perhaps we can have all of the java UDF interfaces (UDF1, UDF2, etc) extend a generic interface called UDF, then have def define(f: UDF, returnType: Class[_]) to simplify the APIs. On Sat, May 30, 2015 at 3:43 AM Reynold Xin r...@databricks.com wrote: I think you are right that there is no way to call Java UDF without registration right now. Adding another 20 methods to functions would be scary. Maybe the best way is to have a companion object for UserDefinedFunction, and define UDF there? e.g. object UserDefinedFunction { def define(f: org.apache.spark.api.java.function.Function0, returnType: Class[_]): UserDefinedFunction // ... define a few more - maybe up to 5 arguments? } Ideally, we should ask for both argument class and return class, so we can do the proper type conversion (e.g. if the UDF expects a string, but the input expression is an int, Catalyst can automatically add a cast). However, we haven't implemented those in UserDefinedFunction yet. On Fri, May 29, 2015 at 12:54 PM, Justin Uang justin.u...@gmail.com wrote: I would like to define a UDF in Java via a closure and then use it without registration. In Scala, I believe there are two ways to do this: myUdf = functions.udf({ _ + 5}) myDf.select(myUdf(myDf(age))) or myDf.select(functions.callUDF({_ + 5}, DataTypes.IntegerType, myDf(age))) However, both of these don't work for Java UDF. The first one requires TypeTags. For the second one, I was able to hack it by creating a scala AbstractFunction1 and using callUDF, which requires declaring the catalyst DataType instead of using TypeTags. However, it was still nasty because I had to return a scala map instead of a java map. Is there first class support for creating a org.apache.spark.sql.UserDefinedFunction that works with the org.apache.spark.sql.api.java.UDF1T1, R? I'm fine with having to declare the catalyst type when creating it. If it doesn't exist, I would be happy to work on it =) Justin
Re: problem with using mapPartitions
bq. val result = fDB.mappartitions(testMP).collect Not sure if you pasted the above code - there was a typo: method name should be mapPartitions Cheers On Sat, May 30, 2015 at 9:44 AM, unioah uni...@gmail.com wrote: Hi, I try to aggregate the value in each partition internally. For example, Before: worker 1:worker 2: 1, 2, 1 2, 1, 2 After: worker 1: worker 2: (1-2), (2-1) (1-1), (2-2) I try to use mappartitions, object MyTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName(This is a test) val sc = new SparkContext(conf) val fDB = sc.parallelize(List(1, 2, 1, 2, 1, 2, 5, 5, 2), 3) val result = fDB.mappartitions(testMP).collect println(result.mkString) sc.stop } def testMP(iter: Iterator[Int]): Iterator[(Long, Int)] = { var result = new LongMap[Int]() var cur = 0l while (iter.hasNext) { cur = iter.next.toLong if (result.contains(cur)) { result(cur) += 1 } else { result += (cur, 1) } } result.toList.iterator } } But I got the error message no matter how I tried. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependent Stages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/05/30 10:41:21 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 1 Anybody can help me? Thx -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/problem-with-using-mapPartitions-tp12514.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Catalyst: Reusing already computed expressions within a projection
I think you are looking for http://en.wikipedia.org/wiki/Common_subexpression_elimination in the optimizer. One thing to note is that as we do more and more optimization like this, the optimization time might increase. Do you see a case where this can bring you substantial performance gains? On Sat, May 30, 2015 at 9:02 AM, Justin Uang justin.u...@gmail.com wrote: On second thought, perhaps can this be done by writing a rule that builds the dag of dependencies between expressions, then convert it into several layers of projections, where each new layer is allowed to depend on expression results from previous projections? Are there any pitfalls to this approach? On Sat, May 30, 2015 at 11:30 AM Justin Uang justin.u...@gmail.com wrote: If I do the following df2 = df.withColumn('y', df['x'] * 7) df3 = df2.withColumn('z', df2.y * 3) df3.explain() Then the result is Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65] PhysicalRDD [date#56,id#57,timestamp#58,x#59], MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163 Effectively I want to compute y = f(x) z = g(y) The catalyst optimizer realizes that y#64 is the same as the one previously computed, however, when building the projection, it is ignoring the fact that it had already computed y, so it calculates `x * 7` twice. y = x * 7 z = x * 7 * 3 If I wanted to make this fix, would it be possible to do the logic in the optimizer phase? I imagine that it's difficult because the expressions in InterpretedMutableProjection don't have access to the previous expression results, only the input row, and that the design doesn't seem to be catered for this.
Re: problem with using mapPartitions
I solved the problem. It was caused by using spark-core_2.11 mvn repository. When I compiled with spark-core_2.10, the problem doesn't show up again. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/problem-with-using-mapPartitions-tp12514p12523.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: [VOTE] Release Apache Spark 1.4.0 (RC3)
+1 (non-binding, of course) 1. Compiled OSX 10.10 (Yosemite) OK Total time: 17:07 min mvn clean package -Pyarn -Dyarn.version=2.6.0 -Phadoop-2.4 -Dhadoop.version=2.6.0 -DskipTests 2. Tested pyspark, mlib - running as well as compare results with 1.3.1 2.1. statistics (min,max,mean,Pearson,Spearman) OK 2.2. Linear/Ridge/Laso Regression OK 2.3. Decision Tree, Naive Bayes OK 2.4. KMeans OK Center And Scale OK 2.5. RDD operations OK State of the Union Texts - MapReduce, Filter,sortByKey (word count) 2.6. Recommendation (Movielens medium dataset ~1 M ratings) OK Model evaluation/optimization (rank, numIter, lambda) with itertools OK 3. Scala - MLlib 3.1. statistics (min,max,mean,Pearson,Spearman) OK 3.2. LinearRegressionWithSGD OK 3.3. Decision Tree OK 3.4. KMeans OK 3.5. Recommendation (Movielens medium dataset ~1 M ratings) OK 3.6. saveAsParquetFile OK 3.7. Read and verify the 4.3 save(above) - sqlContext.parquetFile, registerTempTable, sql OK 3.8. result = sqlContext.sql(SELECT OrderDetails.OrderID,ShipCountry,UnitPrice,Qty,Discount FROM Orders INNER JOIN OrderDetails ON Orders.OrderID = OrderDetails.OrderID) OK 4.0. Spark SQL from Python OK 4.1. result = sqlContext.sql(SELECT * from people WHERE State = 'WA') OK Cheers k/ On Fri, May 29, 2015 at 4:40 PM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc3 (commit dd109a8): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=dd109a8746ec07c7c83995890fc2c0cd7a693730 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-1109/ [published as version: 1.4.0-rc3] https://repository.apache.org/content/repositories/orgapachespark-1110/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Tuesday, June 02, at 00:32 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC1 == Below is a list of bug fixes that went into this RC: http://s.apache.org/vN == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.4 QA period, so -1 votes should only occur for significant regressions from 1.3.1. Bugs already present in 1.3.X, minor regressions, or bugs related to new features will not block this release. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Catalyst: Reusing already computed expressions within a projection
I think this is likely something that we'll want to do during the code generation phase. Though its probably not the lowest hanging fruit at this point. On Sun, May 31, 2015 at 5:02 AM, Reynold Xin r...@databricks.com wrote: I think you are looking for http://en.wikipedia.org/wiki/Common_subexpression_elimination in the optimizer. One thing to note is that as we do more and more optimization like this, the optimization time might increase. Do you see a case where this can bring you substantial performance gains? On Sat, May 30, 2015 at 9:02 AM, Justin Uang justin.u...@gmail.com wrote: On second thought, perhaps can this be done by writing a rule that builds the dag of dependencies between expressions, then convert it into several layers of projections, where each new layer is allowed to depend on expression results from previous projections? Are there any pitfalls to this approach? On Sat, May 30, 2015 at 11:30 AM Justin Uang justin.u...@gmail.com wrote: If I do the following df2 = df.withColumn('y', df['x'] * 7) df3 = df2.withColumn('z', df2.y * 3) df3.explain() Then the result is Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65] PhysicalRDD [date#56,id#57,timestamp#58,x#59], MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163 Effectively I want to compute y = f(x) z = g(y) The catalyst optimizer realizes that y#64 is the same as the one previously computed, however, when building the projection, it is ignoring the fact that it had already computed y, so it calculates `x * 7` twice. y = x * 7 z = x * 7 * 3 If I wanted to make this fix, would it be possible to do the logic in the optimizer phase? I imagine that it's difficult because the expressions in InterpretedMutableProjection don't have access to the previous expression results, only the input row, and that the design doesn't seem to be catered for this.
Re: Using UDFs in Java without registration
I think you are right that there is no way to call Java UDF without registration right now. Adding another 20 methods to functions would be scary. Maybe the best way is to have a companion object for UserDefinedFunction, and define UDF there? e.g. object UserDefinedFunction { def define(f: org.apache.spark.api.java.function.Function0, returnType: Class[_]): UserDefinedFunction // ... define a few more - maybe up to 5 arguments? } Ideally, we should ask for both argument class and return class, so we can do the proper type conversion (e.g. if the UDF expects a string, but the input expression is an int, Catalyst can automatically add a cast). However, we haven't implemented those in UserDefinedFunction yet. On Fri, May 29, 2015 at 12:54 PM, Justin Uang justin.u...@gmail.com wrote: I would like to define a UDF in Java via a closure and then use it without registration. In Scala, I believe there are two ways to do this: myUdf = functions.udf({ _ + 5}) myDf.select(myUdf(myDf(age))) or myDf.select(functions.callUDF({_ + 5}, DataTypes.IntegerType, myDf(age))) However, both of these don't work for Java UDF. The first one requires TypeTags. For the second one, I was able to hack it by creating a scala AbstractFunction1 and using callUDF, which requires declaring the catalyst DataType instead of using TypeTags. However, it was still nasty because I had to return a scala map instead of a java map. Is there first class support for creating a org.apache.spark.sql.UserDefinedFunction that works with the org.apache.spark.sql.api.java.UDF1T1, R? I'm fine with having to declare the catalyst type when creating it. If it doesn't exist, I would be happy to work on it =) Justin
Re: StreamingContextSuite fails with NoSuchMethodError
Did was it a clean compilation? TD On Fri, May 29, 2015 at 10:48 PM, Ted Yu yuzhih...@gmail.com wrote: Hi, I ran the following command on 1.4.0 RC3: mvn -Phadoop-2.4 -Dhadoop.version=2.7.0 -Pyarn -Phive package I saw the following failure: ^[[32mStreamingContextSuite:^[[0m ^[[32m- from no conf constructor^[[0m ^[[32m- from no conf + spark home^[[0m ^[[32m- from no conf + spark home + env^[[0m ^[[32m- from conf with settings^[[0m ^[[32m- from existing SparkContext^[[0m ^[[32m- from existing SparkContext with settings^[[0m ^[[31m*** RUN ABORTED ***^[[0m ^[[31m java.lang.NoSuchMethodError: org.apache.spark.ui.JettyUtils$.createStaticHandler(Ljava/lang/String;Ljava/lang/String;)Lorg/eclipse/jetty/servlet/ServletContextHandler;^[[0m ^[[31m at org.apache.spark.streaming.ui.StreamingTab.attach(StreamingTab.scala:49)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContext$$anonfun$start$2.apply(StreamingContext.scala:585)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContext$$anonfun$start$2.apply(StreamingContext.scala:585)^[[0m ^[[31m at scala.Option.foreach(Option.scala:236)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:585)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContextSuite$$anonfun$8.apply$mcV$sp(StreamingContextSuite.scala:101)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContextSuite$$anonfun$8.apply(StreamingContextSuite.scala:96)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContextSuite$$anonfun$8.apply(StreamingContextSuite.scala:96)^[[0m ^[[31m at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)^[[0m ^[[31m at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)^[[0m Did anyone else encounter similar error ? Cheers
Re: Dataframe's .drop in PySpark doesn't accept Column
Yea would be great to support a Column. Can you create a JIRA, and possibly a pull request? On Fri, May 29, 2015 at 2:45 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Actually, the Scala API too is only based on column name Le ven. 29 mai 2015 à 11:23, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi, Testing a bit more 1.4, it seems that the .drop() method in PySpark doesn't seem to accept a Column as input datatype : *.join(only_the_best, only_the_best.pol_no == df.pol_no, inner).drop(only_the_best.pol_no)\* File /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line 1225, in drop jdf = self._jdf.drop(colName) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 523, in __call__ (new_args, temp_args) = self._get_args(args) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 510, in _get_args temp_arg = converter.convert(arg, self.gateway_client) File /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line 490, in convert for key in object.keys(): TypeError: 'Column' object is not callable It doesn't seem very consistent with rest of the APIs - and is especially annoying when executing joins - because drop(my_key) is not a qualified reference to the column. What do you think about changing that ? or what is the best practice as a workaround ? Regards, Olivier.
Re: StreamingContextSuite fails with NoSuchMethodError
I downloaded source tar ball and ran command similar to following with: clean package -DskipTests Then I ran the following command. Fyi On May 30, 2015, at 12:42 AM, Tathagata Das t...@databricks.com wrote: Did was it a clean compilation? TD On Fri, May 29, 2015 at 10:48 PM, Ted Yu yuzhih...@gmail.com wrote: Hi, I ran the following command on 1.4.0 RC3: mvn -Phadoop-2.4 -Dhadoop.version=2.7.0 -Pyarn -Phive package I saw the following failure: ^[[32mStreamingContextSuite:^[[0m ^[[32m- from no conf constructor^[[0m ^[[32m- from no conf + spark home^[[0m ^[[32m- from no conf + spark home + env^[[0m ^[[32m- from conf with settings^[[0m ^[[32m- from existing SparkContext^[[0m ^[[32m- from existing SparkContext with settings^[[0m ^[[31m*** RUN ABORTED ***^[[0m ^[[31m java.lang.NoSuchMethodError: org.apache.spark.ui.JettyUtils$.createStaticHandler(Ljava/lang/String;Ljava/lang/String;)Lorg/eclipse/jetty/servlet/ServletContextHandler;^[[0m ^[[31m at org.apache.spark.streaming.ui.StreamingTab.attach(StreamingTab.scala:49)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContext$$anonfun$start$2.apply(StreamingContext.scala:585)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContext$$anonfun$start$2.apply(StreamingContext.scala:585)^[[0m ^[[31m at scala.Option.foreach(Option.scala:236)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:585)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContextSuite$$anonfun$8.apply$mcV$sp(StreamingContextSuite.scala:101)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContextSuite$$anonfun$8.apply(StreamingContextSuite.scala:96)^[[0m ^[[31m at org.apache.spark.streaming.StreamingContextSuite$$anonfun$8.apply(StreamingContextSuite.scala:96)^[[0m ^[[31m at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)^[[0m ^[[31m at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)^[[0m Did anyone else encounter similar error ? Cheers
Sidebar: issues targeted for 1.4.0
No 1.4.0 Blockers at this point, which is great. Forking this thread to discuss something else. There are 92 issues targeted for 1.4.0, 28 of which are marked Critical. Many are procedural issues like update docs for 1.4 or check X for 1.4. Are these resolved? They sound like things that are definitely supposed to have finished by now. Certainly, almost all of these are not going to be resolved for 1.4.0. Is this something we should be concerned about? because they are predominantly filed by or assigned to committers, not inexperienced contributors. I'm concerned that Target Version loses meaning if this happens frequently, and this number is ~10% of all JIRAs for 1.4. It's tempting to say X is important and someone will do X before 1.4, and then forget about it since it has been safely noted for later. Meanwhile other issues that grab more immediate attention and get worked on. This constitutes a form of project management, but de facto it's ad-hoc and reactive. Look at how many new issues and changes have still been coming in since the first release candidate of 1.4.0, compared to those targeted for the release. In an ideal world, Target Version really is what's going to go in as far as anyone knows and when new stuff comes up, we all have to figure out what gets dropped to fit by the release date. Boring, standard software project management practice. I don't know how realistic that is, but, I'm wondering how people feel about this, who have filed these JIRAs? Concretely, should non-Critical issues for 1.4.0 be un-Targeted? should they all be un-Targeted after the release? On Fri, May 29, 2015 at 7:40 PM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc3 (commit dd109a8): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=dd109a8746ec07c7c83995890fc2c0cd7a693730 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-1109/ [published as version: 1.4.0-rc3] https://repository.apache.org/content/repositories/orgapachespark-1110/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Tuesday, June 02, at 00:32 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC1 == Below is a list of bug fixes that went into this RC: http://s.apache.org/vN == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.4 QA period, so -1 votes should only occur for significant regressions from 1.3.1. Bugs already present in 1.3.X, minor regressions, or bugs related to new features will not block this release. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Using UDFs in Java without registration
The idea of asking for both the argument and return class is interesting. I don't think we do that for the scala APIs currently, right? In functions.scala, we only use the TypeTag for RT. def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = { UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } There would only be a small subset of conversions that would make sense implicitly (e.g. int to double, the typical conversions in programming languages), but something like (double = int) might be dangerous and (timestamp = double) wouldn't really make sense. Perhaps it's better to be explicit about casts? If we don't care about declaring the types of the arguments, perhaps we can have all of the java UDF interfaces (UDF1, UDF2, etc) extend a generic interface called UDF, then have def define(f: UDF, returnType: Class[_]) to simplify the APIs. On Sat, May 30, 2015 at 3:43 AM Reynold Xin r...@databricks.com wrote: I think you are right that there is no way to call Java UDF without registration right now. Adding another 20 methods to functions would be scary. Maybe the best way is to have a companion object for UserDefinedFunction, and define UDF there? e.g. object UserDefinedFunction { def define(f: org.apache.spark.api.java.function.Function0, returnType: Class[_]): UserDefinedFunction // ... define a few more - maybe up to 5 arguments? } Ideally, we should ask for both argument class and return class, so we can do the proper type conversion (e.g. if the UDF expects a string, but the input expression is an int, Catalyst can automatically add a cast). However, we haven't implemented those in UserDefinedFunction yet. On Fri, May 29, 2015 at 12:54 PM, Justin Uang justin.u...@gmail.com wrote: I would like to define a UDF in Java via a closure and then use it without registration. In Scala, I believe there are two ways to do this: myUdf = functions.udf({ _ + 5}) myDf.select(myUdf(myDf(age))) or myDf.select(functions.callUDF({_ + 5}, DataTypes.IntegerType, myDf(age))) However, both of these don't work for Java UDF. The first one requires TypeTags. For the second one, I was able to hack it by creating a scala AbstractFunction1 and using callUDF, which requires declaring the catalyst DataType instead of using TypeTags. However, it was still nasty because I had to return a scala map instead of a java map. Is there first class support for creating a org.apache.spark.sql.UserDefinedFunction that works with the org.apache.spark.sql.api.java.UDF1T1, R? I'm fine with having to declare the catalyst type when creating it. If it doesn't exist, I would be happy to work on it =) Justin
Re: Dataframe's .drop in PySpark doesn't accept Column
Jira done : https://issues.apache.org/jira/browse/SPARK-7969 I've already started working on it but it's less trivial than it seems because I don't exactly now the inner workings of the catalog, and how to get the qualified name of a column to match it against the schema/catalog. Regards, Olivier. Le sam. 30 mai 2015 à 09:54, Reynold Xin r...@databricks.com a écrit : Yea would be great to support a Column. Can you create a JIRA, and possibly a pull request? On Fri, May 29, 2015 at 2:45 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Actually, the Scala API too is only based on column name Le ven. 29 mai 2015 à 11:23, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi, Testing a bit more 1.4, it seems that the .drop() method in PySpark doesn't seem to accept a Column as input datatype : *.join(only_the_best, only_the_best.pol_no == df.pol_no, inner).drop(only_the_best.pol_no)\* File /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line 1225, in drop jdf = self._jdf.drop(colName) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 523, in __call__ (new_args, temp_args) = self._get_args(args) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 510, in _get_args temp_arg = converter.convert(arg, self.gateway_client) File /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line 490, in convert for key in object.keys(): TypeError: 'Column' object is not callable It doesn't seem very consistent with rest of the APIs - and is especially annoying when executing joins - because drop(my_key) is not a qualified reference to the column. What do you think about changing that ? or what is the best practice as a workaround ? Regards, Olivier.
Catalyst: Reusing already computed expressions within a projection
If I do the following df2 = df.withColumn('y', df['x'] * 7) df3 = df2.withColumn('z', df2.y * 3) df3.explain() Then the result is Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65] PhysicalRDD [date#56,id#57,timestamp#58,x#59], MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163 Effectively I want to compute y = f(x) z = g(y) The catalyst optimizer realizes that y#64 is the same as the one previously computed, however, when building the projection, it is ignoring the fact that it had already computed y, so it calculates `x * 7` twice. y = x * 7 z = x * 7 * 3 If I wanted to make this fix, would it be possible to do the logic in the optimizer phase? I imagine that it's difficult because the expressions in InterpretedMutableProjection don't have access to the previous expression results, only the input row, and that the design doesn't seem to be catered for this.
Re: Catalyst: Reusing already computed expressions within a projection
On second thought, perhaps can this be done by writing a rule that builds the dag of dependencies between expressions, then convert it into several layers of projections, where each new layer is allowed to depend on expression results from previous projections? Are there any pitfalls to this approach? On Sat, May 30, 2015 at 11:30 AM Justin Uang justin.u...@gmail.com wrote: If I do the following df2 = df.withColumn('y', df['x'] * 7) df3 = df2.withColumn('z', df2.y * 3) df3.explain() Then the result is Project [date#56,id#57,timestamp#58,x#59,(x#59 * 7.0) AS y#64,((x#59 * 7.0) AS y#64 * 3.0) AS z#65] PhysicalRDD [date#56,id#57,timestamp#58,x#59], MapPartitionsRDD[125] at mapPartitions at SQLContext.scala:1163 Effectively I want to compute y = f(x) z = g(y) The catalyst optimizer realizes that y#64 is the same as the one previously computed, however, when building the projection, it is ignoring the fact that it had already computed y, so it calculates `x * 7` twice. y = x * 7 z = x * 7 * 3 If I wanted to make this fix, would it be possible to do the logic in the optimizer phase? I imagine that it's difficult because the expressions in InterpretedMutableProjection don't have access to the previous expression results, only the input row, and that the design doesn't seem to be catered for this.