Re: Spark 2.4.3 source download is a dead link
Well it used to work, but I can't say for sure when it failed (I don't use this link every so often :) ) It works now, thanks ! Le mar. 18 juin 2019 à 15:29, Sean Owen a écrit : > Huh, I don't know how long that's been a bug, but the JS that creates > the filename with .replace doesn't seem to have ever worked? > https://github.com/apache/spark-website/pull/207 > > On Tue, Jun 18, 2019 at 4:07 AM Olivier Girardot > wrote: > > > > Hi everyone, > > FYI the spark source download link on spark.apache.org is dead : > > > https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-sources.tgz > > > > Regards, > > > > -- > > Olivier Girardot > -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Spark 2.4.3 source download is a dead link
Hi everyone, FYI the spark source download link on spark.apache.org is dead : https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-sources.tgz Regards, -- *Olivier Girardot*
Re: [External Sender] Re: Spark 2.4.1 on Kubernetes - DNS resolution of driver fails
Hi Prudhvi, not really but we took a drastic approach mitigating this, modifying the bundled launch script to be more resilient. In the kubernetes/dockerfiles/spark/entrypoint.sh in the executor case we added something like that : executor) DRIVER_HOST=$(echo $SPARK_DRIVER_URL | cut -d "@" -f 2 | cut -d ":" -f 1 ) DRIVER_PORT=$(echo $SPARK_DRIVER_URL | cut -d "@" -f 2 | cut -d ":" -f 2 ) for i in $(seq 1 20); do nc -zvw1 $DRIVER_HOST $DRIVER_PORT status=$? if [ $status -eq 0 ] then echo "Driver is accessible, let's rock'n'roll." break else echo "Driver not accessible :-| napping for a while..." sleep 3 fi done CMD=( ${JAVA_HOME}/bin/java That way the executor will not start before the driver is really connectable. That's kind of a hack but we did not experience the issue anymore, so I guess I'll keep it for now. Regards, Olivier. Le mar. 11 juin 2019 à 18:23, Prudhvi Chennuru (CONT) < prudhvi.chenn...@capitalone.com> a écrit : > Hey Oliver, > > I am also facing the same issue on my kubernetes > cluster(v1.11.5) on AWS with spark version 2.3.3, any luck in figuring out > the root cause? > > On Fri, May 3, 2019 at 5:37 AM Olivier Girardot < > o.girar...@lateral-thoughts.com> wrote: > >> Hi, >> I did not try on another vendor, so I can't say if it's only related to >> gke, and no, I did not notice anything on the kubelet or kube-dns >> processes... >> >> Regards >> >> Le ven. 3 mai 2019 à 03:05, Li Gao a écrit : >> >>> hi Olivier, >>> >>> This seems a GKE specific issue? have you tried on other vendors ? Also >>> on the kubelet nodes did you notice any pressure on the DNS side? >>> >>> Li >>> >>> >>> On Mon, Apr 29, 2019, 5:43 AM Olivier Girardot < >>> o.girar...@lateral-thoughts.com> wrote: >>> >>>> Hi everyone, >>>> I have ~300 spark job on Kubernetes (GKE) using the cluster >>>> auto-scaler, and sometimes while running these jobs a pretty bad thing >>>> happens, the driver (in cluster mode) gets scheduled on Kubernetes and >>>> launches many executor pods. >>>> So far so good, but the k8s "Service" associated to the driver does not >>>> seem to be propagated in terms of DNS resolution so all the executor fails >>>> with a "spark-application-..cluster.svc.local" does not exists. >>>> >>>> All executors failing the driver should be failing too, but it >>>> considers that it's a "pending" initial allocation and stay stuck forever >>>> in a loop of "Initial job has not accepted any resources, please check >>>> Cluster UI" >>>> >>>> Has anyone else observed this king of behaviour ? >>>> We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems >>>> to exist even after the "big refactoring" in the kubernetes cluster >>>> scheduler backend. >>>> >>>> I can work on a fix / workaround but I'd like to check with you the >>>> proper way forward : >>>> >>>>- Some processes (like the airflow helm recipe) rely on a "sleep >>>>30s" before launching the dependent pods (that could be added to >>>>/opt/entrypoint.sh used in the kubernetes packing) >>>>- We can add a simple step to the init container trying to do the >>>>DNS resolution and failing after 60s if it did not work >>>> >>>> But these steps won't change the fact that the driver will stay stuck >>>> thinking we're still in the case of the Initial allocation delay. >>>> >>>> Thoughts ? >>>> >>>> -- >>>> *Olivier Girardot* >>>> o.girar...@lateral-thoughts.com >>>> >>> > > -- > *Thanks,* > *Prudhvi Chennuru.* > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark 2.4.1 on Kubernetes - DNS resolution of driver fails
Hi, I did not try on another vendor, so I can't say if it's only related to gke, and no, I did not notice anything on the kubelet or kube-dns processes... Regards Le ven. 3 mai 2019 à 03:05, Li Gao a écrit : > hi Olivier, > > This seems a GKE specific issue? have you tried on other vendors ? Also on > the kubelet nodes did you notice any pressure on the DNS side? > > Li > > > On Mon, Apr 29, 2019, 5:43 AM Olivier Girardot < > o.girar...@lateral-thoughts.com> wrote: > >> Hi everyone, >> I have ~300 spark job on Kubernetes (GKE) using the cluster auto-scaler, >> and sometimes while running these jobs a pretty bad thing happens, the >> driver (in cluster mode) gets scheduled on Kubernetes and launches many >> executor pods. >> So far so good, but the k8s "Service" associated to the driver does not >> seem to be propagated in terms of DNS resolution so all the executor fails >> with a "spark-application-..cluster.svc.local" does not exists. >> >> All executors failing the driver should be failing too, but it considers >> that it's a "pending" initial allocation and stay stuck forever in a loop >> of "Initial job has not accepted any resources, please check Cluster UI" >> >> Has anyone else observed this king of behaviour ? >> We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems to >> exist even after the "big refactoring" in the kubernetes cluster scheduler >> backend. >> >> I can work on a fix / workaround but I'd like to check with you the >> proper way forward : >> >>- Some processes (like the airflow helm recipe) rely on a "sleep 30s" >>before launching the dependent pods (that could be added to >>/opt/entrypoint.sh used in the kubernetes packing) >>- We can add a simple step to the init container trying to do the DNS >>resolution and failing after 60s if it did not work >> >> But these steps won't change the fact that the driver will stay stuck >> thinking we're still in the case of the Initial allocation delay. >> >> Thoughts ? >> >> -- >> *Olivier Girardot* >> o.girar...@lateral-thoughts.com >> >
Spark 2.4.1 on Kubernetes - DNS resolution of driver fails
Hi everyone, I have ~300 spark job on Kubernetes (GKE) using the cluster auto-scaler, and sometimes while running these jobs a pretty bad thing happens, the driver (in cluster mode) gets scheduled on Kubernetes and launches many executor pods. So far so good, but the k8s "Service" associated to the driver does not seem to be propagated in terms of DNS resolution so all the executor fails with a "spark-application-..cluster.svc.local" does not exists. All executors failing the driver should be failing too, but it considers that it's a "pending" initial allocation and stay stuck forever in a loop of "Initial job has not accepted any resources, please check Cluster UI" Has anyone else observed this king of behaviour ? We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems to exist even after the "big refactoring" in the kubernetes cluster scheduler backend. I can work on a fix / workaround but I'd like to check with you the proper way forward : - Some processes (like the airflow helm recipe) rely on a "sleep 30s" before launching the dependent pods (that could be added to /opt/entrypoint.sh used in the kubernetes packing) - We can add a simple step to the init container trying to do the DNS resolution and failing after 60s if it did not work But these steps won't change the fact that the driver will stay stuck thinking we're still in the case of the Initial allocation delay. Thoughts ? -- *Olivier Girardot* o.girar...@lateral-thoughts.com
Back to SQL
Hi everyone, Is there any known way to go from a Spark SQL Logical Plan (optimised ?) Back to a SQL query ? Regards, Olivier.
Nested "struct" fonction call creates a compilation error in Spark SQL
Hi everyone, when we create recursive calls to "struct" (up to 5 levels) for extending a complex datastructure we end up with the following compilation error : org.codehaus.janino.JaninoRuntimeException: Code of method "(I[Lscala/collection/Iterator;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB The CreateStruct code itself is properly using the ctx.splitExpression command but the "end result" of the df.select( struct(struct(struct() ))) ends up being too much. Should I open a JIRA or is there a workaround ? Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com
Re: Will higher order functions in spark SQL be pushed upstream?
+1 for the question 2017-06-07 19:50 GMT+02:00 Antoine HOM <antoine@gmail.com>: > Hey guys, > > Databricks released higher order functions as part of their runtime > 3.0 beta (https://databricks.com/blog/2017/05/24/working-with- > nested-data-using-higher-order-functions-in-sql-on-databricks.html), > which helps working with array within SQL statements. > > * As a heavy user of complex data types I was wondering if there was > any plan to push those changes upstream? > * In addition, I was wondering if as part of this change it also tries > to solve the column pruning / filter pushdown issues with complex > datatypes? > > Thanks! > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: welcoming Burak and Holden as committers
Congratulations ! On Thu, Jan 26, 2017 1:36 AM, trs...@gmail.com wrote: Congratulations! On Thu, 26 Jan 2017, 02:27 Bryan Cutler, <cutl...@gmail.com> wrote: Congratulations Holden and Burak, well deserved!!! On Tue, Jan 24, 2017 at 10:13 AM, Reynold Xin <r...@databricks.com> wrote: Hi all, Burak and Holden have recently been elected as Apache Spark committers. Burak has been very active in a large number of areas in Spark, including linear algebra, stats/maths functions in DataFrames, Python/R APIs for DataFrames, dstream, and most recently Structured Streaming. Holden has been a long time Spark contributor and evangelist. She has written a few books on Spark, as well as frequent contributions to the Python API to improve its usability and performance. Please join me in welcoming the two! Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark SQL - Applying transformation on a struct inside an array
So, it seems the only way I found for now is a recursive handling of the Row instances directly, but to do that I have to go back to RDDs, i've put together a simple test case demonstrating the problem : import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.{FlatSpec, Matchers} class extends with DFInPlaceTransform FlatSpec Matchers { val spark = SparkSession.builder().appName("local""local[*]" ).master().getOrCreate() it should "access and mutate deeply nested arrays/structs" in { val df = spark.read.json(spark.sparkContext.parallelize(List( """{"a":[{"b" : "toto" }]}""".stripMargin))) df.show() df.printSchema() val result = transformInPlace("a.b", df) result.printSchema() result.show() result.schema should be (df.schema) val res = result.toJSON.take(1) res should be("""{"a":[{"b" : TOTO" }]}""") } def transformInPlace(path: String, df: DataFrame): DataFrame = { val udf = spark.udf.register("transform", (s: String) => s.toUpperCase) val paths = path.split('.') val root = paths.head import org.apache.spark.sql.functions._ df.withColumn(root, udf(df(path))) // does not work of course } } the three other solutions I see are * to create a dedicated Expression for in-place modifications of nested arrays and structs, * to use heavy explode/lateral views/group by computations, but that's bound to be inefficient * or to generate bytecode using the schema to do the nested "getRow,getSeq…" and re-create the rows once transformation is applied I'd like to open an issue regarding that use case because it's not the first or last time it comes up and I still don't see any generic solution using Dataframes.Thanks for your time,Regards, Olivier On Fri, Sep 16, 2016 10:19 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi michael,Well for nested structs, I saw in the tests the behaviour defined by SPARK-12512 for the "a.b.c" handling in withColumn, and even if it's not ideal for me, I managed to make it work anyway like that :> df.withColumn("a", struct(struct(myUDF(df("a.b.c." // I didn't put back the aliases but you see what I mean. What I'd like to make work in essence is something like that> val someFunc : String => String = ???> val myUDF = udf(someFunc)> df.withColumn("a.b[*].c", myUDF(df("a.b[*].c"))) // the fact is that in order to be consistent with the previous API, maybe I'd have to put something like a struct(array(struct(… which would be troublesome because I'd have to parse the arbitrary input string and create something like "a.b[*].c" => struct(array(struct( I realise the ambiguity implied in the kind of column expression, but it doesn't seem for now available to cleanly update data inplace at an arbitrary depth. I'll try to work on a PR that would make this possible, but any pointers would be appreciated. Regards, Olivier. On Fri, Sep 16, 2016 12:42 AM, Michael Armbrust mich...@databricks.com wrote: Is what you are looking for a withColumn that support in place modification of nested columns? or is it some other problem? On Wed, Sep 14, 2016 at 11:07 PM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: I tried to use the RowEncoder but got stuck along the way :The main issue really is that even if it's possible (however tedious) to pattern match generically Row(s) and target the nested field that you need to modify, Rows being immutable data structure without a method like a case class's copy or any kind of lens to create a brand new object, I ended up stuck at the step "target and extract the field to update" without any way to update the original Row with the new value. To sum up, I tried : * using only dataframe's API itself + my udf - which works for nested structs as long as no arrays are along the way * trying to create a udf the can apply on Row and pattern match recursively the path I needed to explore/modify * trying to create a UDT - but we seem to be stuck in a strange middle-ground with 2.0 because some parts of the API ended up private while some stayed public making it impossible to use it now (I'd be glad if I'm mistaken) All of these failed for me and I ended up converting the rows to JSON and update using JSONPath which is…. something I'd like to avoid 'pretty please' On Thu, Sep 15, 2016 5:20 AM, Michael Allman mich...@videoamp.com wrote: Hi Guys, Have you tried org.apache.spark.sql.catalyst.encoders.RowEncoder? It's not a public API, but it is publicly accessible. I used it recently to correct some bad data in a few nested columns in a dataframe. It wasn't an easy job, but it made it possible. In my particular case I was not working with arrays. Olivier, I'm interested in seeing what you come up with. Than
Re: Using Spark as a Maven dependency but with Hadoop 2.6
I know that the code itself would not be the same, but it would be useful to at least have the pom/build.sbt transitive dependencies different when fetching the artifact with a specific classifier, don't you think ?For now I've overriden them myself using the dependency versions defined in the pom.xml of spark.So it's not a blocker issue, it may be useful to document it, but a blog post would be sufficient I think. On Wed, Sep 28, 2016 7:21 PM, Sean Owen so...@cloudera.com wrote: I guess I'm claiming the artifacts wouldn't even be different in the first place, because the Hadoop APIs that are used are all the same across these versions. That would be the thing that makes you need multiple versions of the artifact under multiple classifiers. On Wed, Sep 28, 2016 at 1:16 PM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: ok, don't you think it could be published with just different classifiers hadoop-2.6hadoop-2.4 hadoop-2.2 being the current default. So for now, I should just override spark 2.0.0's dependencies with the ones defined in the pom profile On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com wrote: There can be just one published version of the Spark artifacts and they have to depend on something, though in truth they'd be binary-compatible with anything 2.2+. So you merely manage the dependency versions up to the desired version in your . On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to generate the different tar.gz bundles that we can download, Is there by any chance publications of Spark 2.0.0 with different classifier according to different versions of Hadoop available ? Thanks for your time ! Olivier Girardot Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Using Spark as a Maven dependency but with Hadoop 2.6
ok, don't you think it could be published with just different classifiers hadoop-2.6hadoop-2.4 hadoop-2.2 being the current default. So for now, I should just override spark 2.0.0's dependencies with the ones defined in the pom profile On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com wrote: There can be just one published version of the Spark artifacts and they have to depend on something, though in truth they'd be binary-compatible with anything 2.2+. So you merely manage the dependency versions up to the desired version in your . On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to generate the different tar.gz bundles that we can download, Is there by any chance publications of Spark 2.0.0 with different classifier according to different versions of Hadoop available ? Thanks for your time ! Olivier Girardot Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Spark SQL - Applying transformation on a struct inside an array
Hi everyone,I'm currently trying to create a generic transformation mecanism on a Dataframe to modify an arbitrary column regardless of the underlying the schema. It's "relatively" straightforward for complex types like struct<struct<…>> to apply an arbitrary UDF on the column and replace the data "inside" the struct, however I'm struggling to make it work for complex types containing arrays along the way like struct<array<struct<…>>>. Michael Armbrust seemed to allude on the mailing list/forum to a way of using Encoders to do that, I'd be interested in any pointers, especially considering that it's not possible to output any Row or GenericRowWithSchema from a UDF (thanks to https://github.com/apache/spark/blob/v2.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L657 it seems). To sum up, I'd like to find a way to apply a transformation on complex nested datatypes (arrays and struct) on a Dataframe updating the value itself. Regards, Olivier Girardot
Re: Aggregations with scala pairs
CC'ing dev list, you should open a Jira and a PR related to it to discuss it c.f. https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingCodeChanges On Wed, Aug 17, 2016 4:01 PM, Andrés Ivaldi iaiva...@gmail.com wrote: Hello, I'd like to report a wrong behavior of DataSet's API, I don´t know how I can do that. My Jira account doesn't allow me to add a Issue I'm using Apache 2.0.0 but the problem came since at least version 1.4 (given the doc since 1.3) The problem is simple to reporduce, also the work arround, if we apply agg over a DataSet with scala pairs over the same column, only one agg over that column is actualy used, this is because the toMap that reduce the pair values of the mane key to one and overwriting the value class https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { agg((aggExpr +: aggExprs).toMap) } rewrited as somthing like this should work def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { toDF((aggExpr +: aggExprs).map { pairExpr => strToExpr(pairExpr._2)(df(pairExpr._1).expr) }.toSeq) } regards -- Ing. Ivaldi Andres Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark SQL and Kryo registration
Hi everyone, it seems that it works now out of the box. So nevermind, registration is compatible with spark 2.0 when using dataframes. Regards, Olivier. On Fri, Aug 5, 2016 10:07 AM, Maciej Bryński mac...@brynski.pl wrote: Hi Olivier, Did you check performance of Kryo ? I have observations that Kryo is slightly slower than Java Serializer. Regards, Maciek 2016-08-04 17:41 GMT+02:00 Amit Sela < amitsel...@gmail.com > : It should. Codegen uses the SparkConf in SparkEnv when instantiating a new Serializer. On Thu, Aug 4, 2016 at 6:14 PM Jacek Laskowski < ja...@japila.pl > wrote: Hi Olivier, I don't know either, but am curious what you've tried already. Jacek On 3 Aug 2016 10:50 a.m., "Olivier Girardot" < o.girardot@lateral-thoughts. com > wrote: Hi everyone, I'm currently to use Spark 2.0.0 and making Dataframes work with kryo. registrationRequired=true Is it even possible at all considering the codegen ? Regards, Olivier Girardot | Associé o.girardot@lateral-thoughts. com +33 6 24 09 17 94 -- Maciek Bryński Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Spark SQL and Kryo registration
Hi everyone, I'm currently to use Spark 2.0.0 and making Dataframes work with kryo.registrationRequired=true Is it even possible at all considering the codegen ? Regards, Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: tpcds for spark2.0
I have the same kind of issue (not using spark-sql-perf), just trying to deploy 2.0.0 on mesos. I'll keep you posted as I investigate On Wed, Jul 27, 2016 1:06 PM, kevin kiss.kevin...@gmail.com wrote: hi,all: I want to have a test about tpcds99 sql run on spark2.0. I user https://github.com/databricks/spark-sql-perf about the master version ,when I run :val tpcds = new TPCDS (sqlContext = sqlContext) I got error: scala> val tpcds = new TPCDS (sqlContext = sqlContext) error: missing or invalid dependency detected while loading class file 'Benchmarkable.class'. Could not access term typesafe in package com, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'Benchmarkable.class' was compiled against an incompatible version of com. error: missing or invalid dependency detected while loading class file 'Benchmarkable.class'. Could not access term scalalogging in value com.typesafe, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'Benchmarkable.class' was compiled against an incompatible version of com.typesafe. about spark-sql-perf-0.4.3 when I run :tables.genData("hdfs://master1:9000/tpctest", "parquet", true, false, false, false, false) I got error: Generating table catalog_sales in database to hdfs://master1:9000/tpctest/catalog_sales with save mode Overwrite. 16/07/27 18:59:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, slave1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org $apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: ClassCastException using DataFrame only when num-executors > 2 ...
sorry for the delay, yes still. I'm still trying to figure out if it comes from bad data and trying to isolate the bug itself... 2015-09-11 0:28 GMT+02:00 Reynold Xin <r...@databricks.com>: > Does this still happen on 1.5.0 release? > > > On Mon, Aug 31, 2015 at 9:31 AM, Olivier Girardot <ssab...@gmail.com> > wrote: > >> tested now against Spark 1.5.0 rc2, and same exceptions happen when >> num-executors > 2 : >> >> 15/08/25 10:31:10 WARN scheduler.TaskSetManager: Lost task 0.1 in stage >> 5.0 (TID 501, xxx): java.lang.ClassCastException: java.lang.Double >> cannot be cast to java.lang.Long >> at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) >> at >> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41) >> at >> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:220) >> at >> org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85) >> at >> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown >> Source) >> at >> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:325) >> at >> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252) >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> >> >> 2015-08-26 11:47 GMT+02:00 Olivier Girardot <ssab...@gmail.com>: >> >>> Hi everyone, >>> I know this "post title" doesn't seem very logical and I agree, >>> we have a very complex computation using "only" pyspark dataframes and >>> when launching the computation on a CDH 5.3 cluster using Spark 1.5.0 rc1 >>> (problem is reproduced with 1.4.x). >>> If the number of executors is the default 2, the computation is very >>> long but doesn't fail. >>> If the number of executors is 3 or more (tested up to 20), then the >>> computation fails very quickly with the following error : >>> >>> *Caused by: java.lang.ClassCastException: java.lang.Double cannot be >>> cast to java.lang.Long* >>> >>> The complete stracktrace being : >>> >>> Driver stacktrace: >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1267) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1255) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1254) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1254) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:684) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1480) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1442) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1431) >>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:554) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1805) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1818) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1902) >>> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) >>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) >>> at org.apache.spark.rdd.RDD.collect(RDD.scala:904) >>> at org.apa
Re: ClassCastException using DataFrame only when num-executors > 2 ...
tested now against Spark 1.5.0 rc2, and same exceptions happen when num-executors > 2 : 15/08/25 10:31:10 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 5.0 (TID 501, xxx): java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:220) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:325) at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 2015-08-26 11:47 GMT+02:00 Olivier Girardot <ssab...@gmail.com>: > Hi everyone, > I know this "post title" doesn't seem very logical and I agree, > we have a very complex computation using "only" pyspark dataframes and > when launching the computation on a CDH 5.3 cluster using Spark 1.5.0 rc1 > (problem is reproduced with 1.4.x). > If the number of executors is the default 2, the computation is very long > but doesn't fail. > If the number of executors is 3 or more (tested up to 20), then the > computation fails very quickly with the following error : > > *Caused by: java.lang.ClassCastException: java.lang.Double cannot be cast > to java.lang.Long* > > The complete stracktrace being : > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1267) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1255) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1254) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1254) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:684) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1480) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1442) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1431) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:554) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1805) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1818) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1902) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.collect(RDD.scala:904) > at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:264) > at org.apache.spark.RangePartitioner.(Partitioner.scala:126) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:156) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > ... 138 more > *Caused by: java.lang.ClassCastException: java.lang.Double cannot be cast > to java.lang.Long* > at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41) > at > org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:220) > at > org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.Window$
Re: [ANNOUNCE] Nightly maven and package builds for Spark
Hi Patrick, is there any way for the nightly build to include common distributions like : with/without hive/yarn support, hadoop 2.4, 2.6 etc... ? For now it seems that the nightly binary package builds actually ships only the source ? I can help on that too if you want, Regards, Olivier. 2015-08-02 5:19 GMT+02:00 Bharath Ravi Kumar reachb...@gmail.com: Thanks for fixing it. On Sun, Aug 2, 2015 at 3:17 AM, Patrick Wendell pwend...@gmail.com wrote: Hey All, I got it up and running - it was a newly surfaced bug in the build scripts. - Patrick On Wed, Jul 29, 2015 at 6:05 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hey Patrick, Any update on this front please? Thanks, Bharath On Fri, Jul 24, 2015 at 8:38 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Bharath, There was actually an incompatible change to the build process that broke several of the Jenkins builds. This should be patched up in the next day or two and nightly builds will resume. - Patrick On Fri, Jul 24, 2015 at 12:51 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: I noticed the last (1.5) build has a timestamp of 16th July. Have nightly builds been discontinued since then? Thanks, Bharath On Sun, May 24, 2015 at 1:11 PM, Patrick Wendell pwend...@gmail.com wrote: Hi All, This week I got around to setting up nightly builds for Spark on Jenkins. I'd like feedback on these and if it's going well I can merge the relevant automation scripts into Spark mainline and document it on the website. Right now I'm doing: 1. SNAPSHOT's of Spark master and release branches published to ASF Maven snapshot repo: https://repository.apache.org/content/repositories/snapshots/org/apache/spark/ These are usable by adding this repository in your build and using a snapshot version (e.g. 1.3.2-SNAPSHOT). 2. Nightly binary package builds and doc builds of master and release versions. http://people.apache.org/~pwendell/spark-nightly/ These build 4 times per day and are tagged based on commits. If anyone has feedback on these please let me know. Thanks! - Patrick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark CBO
Hi, there is one cost-based analyzer implemented in Spark SQL, if I'm not mistaken, regarding the Join operations, If the join operation is done with a small dataset then Spark SQL's strategy will be to broadcast automatically the small dataset instead of shuffling. I guess you have something else on your mind ? Regards, Olivier. 2015-07-31 8:38 GMT+02:00 burakkk burak.isi...@gmail.com: Hi everyone, I'm wondering that is there any plan to implement cost-based optimizer for Spark SQL? Best regards... -- *BURAK ISIKLI* | *http://burakisikli.wordpress.com http://burakisikli.wordpress.com* -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: countByValue on dataframe with multiple columns
yes and freqItems does not give you an ordered count (right ?) + the threshold makes it difficult to calibrate it + we noticed some strange behaviour when testing it on small datasets. 2015-07-21 20:30 GMT+02:00 Ted Malaska ted.mala...@cloudera.com: Look at the implementation for frequently items. It is a different from true count. On Jul 21, 2015 1:19 PM, Reynold Xin r...@databricks.com wrote: Is this just frequent items? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala#L97 On Tue, Jul 21, 2015 at 7:39 AM, Ted Malaska ted.mala...@cloudera.com wrote: 100% I would love to do it. Who a good person to review the design with. All I need is a quick chat about the design and approach and I'll create the jira and push a patch. Ted Malaska On Tue, Jul 21, 2015 at 10:19 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi Ted, The TopNList would be great to see directly in the Dataframe API and my wish would be to be able to apply it on multiple columns at the same time and get all these statistics. the .describe() function is close to what we want to achieve, maybe we could try to enrich its output. Anyway, even as a spark-package, if you could package your code for Dataframes, that would be great. Regards, Olivier. 2015-07-21 15:08 GMT+02:00 Jonathan Winandy jonathan.wina...@gmail.com : Ha ok ! Then generic part would have that signature : def countColsByValue(df:Dataframe):Map[String /* colname */,Dataframe] +1 for more work (blog / api) for data quality checks. Cheers, Jonathan TopCMSParams and some other monoids from Algebird are really cool for that : https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala#L590 On 21 July 2015 at 13:40, Ted Malaska ted.mala...@cloudera.com wrote: I'm guessing you want something like what I put in this blog post. http://blog.cloudera.com/blog/2015/07/how-to-do-data-quality-checks-using-apache-spark-dataframes/ This is a very common use case. If there is a +1 I would love to add it to dataframes. Let me know Ted Malaska On Tue, Jul 21, 2015 at 7:24 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yop, actually the generic part does not work, the countByValue on one column gives you the count for each value seen in the column. I would like a generic (multi-column) countByValue to give me the same kind of output for each column, not considering each n-uples of each column value as the key (which is what the groupBy is doing by default). Regards, Olivier 2015-07-20 14:18 GMT+02:00 Jonathan Winandy jonathan.wina...@gmail.com: Ahoy ! Maybe you can get countByValue by using sql.GroupedData : // some DFval df: DataFrame = sqlContext.createDataFrame(sc.parallelize(List(A,B, B, A)).map(Row.apply(_)), StructType(List(StructField(n, StringType df.groupBy(n).count().show() // generic def countByValueDf(df:DataFrame) = { val (h :: r) = df.columns.toList df.groupBy(h, r:_*).count() } countByValueDf(df).show() Cheers, Jon On 20 July 2015 at 11:28, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the countByValue function to Spark SQL Dataframe ? Even https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala#L78 is using the RDD part right now, but for ML purposes, being able to get the most frequent categorical value on multiple columns would be very useful. Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: countByValue on dataframe with multiple columns
Yop, actually the generic part does not work, the countByValue on one column gives you the count for each value seen in the column. I would like a generic (multi-column) countByValue to give me the same kind of output for each column, not considering each n-uples of each column value as the key (which is what the groupBy is doing by default). Regards, Olivier 2015-07-20 14:18 GMT+02:00 Jonathan Winandy jonathan.wina...@gmail.com: Ahoy ! Maybe you can get countByValue by using sql.GroupedData : // some DFval df: DataFrame = sqlContext.createDataFrame(sc.parallelize(List(A,B, B, A)).map(Row.apply(_)), StructType(List(StructField(n, StringType df.groupBy(n).count().show() // generic def countByValueDf(df:DataFrame) = { val (h :: r) = df.columns.toList df.groupBy(h, r:_*).count() } countByValueDf(df).show() Cheers, Jon On 20 July 2015 at 11:28, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the countByValue function to Spark SQL Dataframe ? Even https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala#L78 is using the RDD part right now, but for ML purposes, being able to get the most frequent categorical value on multiple columns would be very useful. Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: countByValue on dataframe with multiple columns
Hi Ted, The TopNList would be great to see directly in the Dataframe API and my wish would be to be able to apply it on multiple columns at the same time and get all these statistics. the .describe() function is close to what we want to achieve, maybe we could try to enrich its output. Anyway, even as a spark-package, if you could package your code for Dataframes, that would be great. Regards, Olivier. 2015-07-21 15:08 GMT+02:00 Jonathan Winandy jonathan.wina...@gmail.com: Ha ok ! Then generic part would have that signature : def countColsByValue(df:Dataframe):Map[String /* colname */,Dataframe] +1 for more work (blog / api) for data quality checks. Cheers, Jonathan TopCMSParams and some other monoids from Algebird are really cool for that : https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala#L590 On 21 July 2015 at 13:40, Ted Malaska ted.mala...@cloudera.com wrote: I'm guessing you want something like what I put in this blog post. http://blog.cloudera.com/blog/2015/07/how-to-do-data-quality-checks-using-apache-spark-dataframes/ This is a very common use case. If there is a +1 I would love to add it to dataframes. Let me know Ted Malaska On Tue, Jul 21, 2015 at 7:24 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yop, actually the generic part does not work, the countByValue on one column gives you the count for each value seen in the column. I would like a generic (multi-column) countByValue to give me the same kind of output for each column, not considering each n-uples of each column value as the key (which is what the groupBy is doing by default). Regards, Olivier 2015-07-20 14:18 GMT+02:00 Jonathan Winandy jonathan.wina...@gmail.com : Ahoy ! Maybe you can get countByValue by using sql.GroupedData : // some DFval df: DataFrame = sqlContext.createDataFrame(sc.parallelize(List(A,B, B, A)).map(Row.apply(_)), StructType(List(StructField(n, StringType df.groupBy(n).count().show() // generic def countByValueDf(df:DataFrame) = { val (h :: r) = df.columns.toList df.groupBy(h, r:_*).count() } countByValueDf(df).show() Cheers, Jon On 20 July 2015 at 11:28, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the countByValue function to Spark SQL Dataframe ? Even https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala#L78 is using the RDD part right now, but for ML purposes, being able to get the most frequent categorical value on multiple columns would be very useful. Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
countByValue on dataframe with multiple columns
Hi, Is there any plan to add the countByValue function to Spark SQL Dataframe ? Even https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala#L78 is using the RDD part right now, but for ML purposes, being able to get the most frequent categorical value on multiple columns would be very useful. Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
RandomForest evaluator for grid search
Hi everyone, Using spark-ml there seems to be only BinaryClassificationEvaluator and RegressionEvaluator, is there any way or plan to provide a ROC-based or PR-based or F-Measure based for multi-class, I would be interested especially in evaluating and doing a grid search for a RandomForest model. Regards, Olivier.
Re: RandomForest evaluator for grid search
thx for the info. I'd be interested in getting the full predict_proba like in scikit learn ( http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html#sklearn.ensemble.RandomForestClassifier.predict_proba) for the random forest model. There doesn't seem to be a way to get the details, is there any reason for that ? Regards, Olivier. Le lun. 13 juil. 2015 à 21:12, Feynman Liang fli...@databricks.com a écrit : There is MulticlassMetrics in MLlib; unfortunately a pipelined version hasn't yet been made for spark-ml. SPARK-7690 https://issues.apache.org/jira/browse/SPARK-7690 is tracking work on this if you are interested in following the development. On Mon, Jul 13, 2015 at 2:16 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Using spark-ml there seems to be only BinaryClassificationEvaluator and RegressionEvaluator, is there any way or plan to provide a ROC-based or PR-based or F-Measure based for multi-class, I would be interested especially in evaluating and doing a grid search for a RandomForest model. Regards, Olivier.
Re: PySpark on PyPi
Ok, I get it. Now what can we do to improve the current situation, because right now if I want to set-up a CI env for PySpark, I have to : 1- download a pre-built version of pyspark and unzip it somewhere on every agent 2- define the SPARK_HOME env 3- symlink this distribution pyspark dir inside the python install dir site-packages/ directory and if I rely on additional packages (like databricks' Spark-CSV project), I have to (except if I'm mistaken) 4- compile/assembly spark-csv, deploy the jar in a specific directory on every agent 5- add this jar-filled directory to the Spark distribution's additional classpath using the conf/spark-default file Then finally we can launch our unit/integration-tests. Some issues are related to spark-packages, some to the lack of python-based dependency, and some to the way SparkContext are launched when using pyspark. I think step 1 and 2 are fair enough 4 and 5 may already have solutions, I didn't check and considering spark-shell is downloading such dependencies automatically, I think if nothing's done yet it will (I guess ?). For step 3, maybe just adding a setup.py to the distribution would be enough, I'm not exactly advocating to distribute a full 300Mb spark distribution in PyPi, maybe there's a better compromise ? Regards, Olivier. Le ven. 5 juin 2015 à 22:12, Jey Kottalam j...@cs.berkeley.edu a écrit : Couldn't we have a pip installable pyspark package that just serves as a shim to an existing Spark installation? Or it could even download the latest Spark binary if SPARK_HOME isn't set during installation. Right now, Spark doesn't play very well with the usual Python ecosystem. For example, why do I need to use a strange incantation when booting up IPython if I want to use PySpark in a notebook with MASTER=local[4]? It would be much nicer to just type `from pyspark import SparkContext; sc = SparkContext(local[4])` in my notebook. I did a test and it seems like PySpark's basic unit-tests do pass when SPARK_HOME is set and Py4J is on the PYTHONPATH: PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH python $SPARK_HOME/python/pyspark/rdd.py -Jey On Fri, Jun 5, 2015 at 10:57 AM, Josh Rosen rosenvi...@gmail.com wrote: This has been proposed before: https://issues.apache.org/jira/browse/SPARK-1267 There's currently tighter coupling between the Python and Java halves of PySpark than just requiring SPARK_HOME to be set; if we did this, I bet we'd run into tons of issues when users try to run a newer version of the Python half of PySpark against an older set of Java components or vice-versa. On Thu, Jun 4, 2015 at 10:45 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Considering the python API as just a front needing the SPARK_HOME defined anyway, I think it would be interesting to deploy the Python part of Spark on PyPi in order to handle the dependencies in a Python project needing PySpark via pip. For now I just symlink the python/pyspark in my python install dir site-packages/ in order for PyCharm or other lint tools to work properly. I can do the setup.py work or anything. What do you think ? Regards, Olivier.
PySpark on PyPi
Hi everyone, Considering the python API as just a front needing the SPARK_HOME defined anyway, I think it would be interesting to deploy the Python part of Spark on PyPi in order to handle the dependencies in a Python project needing PySpark via pip. For now I just symlink the python/pyspark in my python install dir site-packages/ in order for PyCharm or other lint tools to work properly. I can do the setup.py work or anything. What do you think ? Regards, Olivier.
Re: [VOTE] Release Apache Spark 1.4.0 (RC3)
Hi everyone, I think there's a blocker on PySpark the when functions in python seems to be broken but the Scala API seems fine. Here's a snippet demonstrating that with Spark 1.4.0 RC3 : In [*1*]: df = sqlCtx.createDataFrame([(1, 1), (2, 2), (1, 2), (1, 2)], [key, value]) In [*2*]: from pyspark.sql import functions as F In [*8*]: df.select(df.key, F.when(df.key 1, 0).when(df.key == 0, 2).otherwise(1)).show() +---+-+ | key |CASE WHEN (key = 0) THEN 2 ELSE 1| +---+-+ | 1| 1| | 2| 1| | 1| 1| | 1| 1| +---+-+ When in Scala I get the expectes expression and behaviour : scala val df = sqlContext.createDataFrame(List((1, 1), (2, 2), (1, 2), (1, 2))).toDF(key, value) scala import org.apache.spark.sql.functions._ scala df.select(df(key), when(df(key) 1, 0).when(df(key) === 2, 2).otherwise(1)).show() +---+---+ |key|CASE WHEN (key 1) THEN 0 WHEN (key = 2) THEN 2 ELSE 1| +---+---+ | 1| 1| | 2| 0| | 1| 1| | 1| 1| +---+---+ I've opened the Jira (https://issues.apache.org/jira/browse/SPARK-8038) and fixed it here https://github.com/apache/spark/pull/6580 Regards, Olivier. Le mar. 2 juin 2015 à 07:34, Bobby Chowdary bobby.chowdar...@gmail.com a écrit : Hi Patrick, Thanks for clarifying. No issues with functionality. +1 (non-binding) Thanks Bobby On Mon, Jun 1, 2015 at 9:41 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Bobby, Those are generic warnings that the hadoop libraries throw. If you are using MapRFS they shouldn't matter since you are using the MapR client and not the default hadoop client. Do you have any issues with functionality... or was it just seeing the warnings that was the concern? Thanks for helping test! - Patrick On Mon, Jun 1, 2015 at 5:18 PM, Bobby Chowdary bobby.chowdar...@gmail.com wrote: Hive Context works on RC3 for Mapr after adding spark.sql.hive.metastore.sharedPrefixes as suggested in SPARK-7819. However, there still seems to be some other issues with native libraries, i get below warning WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable. I tried adding even after adding SPARK_LIBRARYPATH and --driver-library-path with no luck. Built on MacOSX and running CentOS 7 JDK1.6 and JDK 1.8 (tried both) make-distribution.sh --tgz --skip-java-test -Phive -Phive-0.13.1 -Pmapr4 -Pnetlib-lgpl -Phive-thriftserver. C On Mon, Jun 1, 2015 at 3:05 PM, Sean Owen so...@cloudera.com wrote: I get a bunch of failures in VersionSuite with build/test params -Pyarn -Phive -Phadoop-2.6: - success sanity check *** FAILED *** java.lang.RuntimeException: [download failed: org.jboss.netty#netty;3.2.2.Final!netty.jar(bundle), download failed: commons-net#commons-net;3.1!commons-net.jar] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:978) ... but maybe I missed the memo about how to build for Hive? do I still need another Hive profile? Other tests, signatures, etc look good. On Sat, May 30, 2015 at 12:40 AM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc3 (commit dd109a8): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=dd109a8746ec07c7c83995890fc2c0cd7a693730 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-1109/ [published as version: 1.4.0-rc3] https://repository.apache.org/content/repositories/orgapachespark-1110/ The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc3-docs/ Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Tuesday, June 02, at 00:32 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC1 == Below is a list of bug fixes that went into this RC: http://s.apache.org/vN == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload
Re: Dataframe's .drop in PySpark doesn't accept Column
I understand the rational, but when you need to reference, for example when using a join, some column which name is not unique, it can be confusing in terms of API. However I figured out that you can use a qualified name for the column using the *other-dataframe.column_name* syntax, maybe we just need to document this well... Le dim. 31 mai 2015 à 12:18, 范文臣 cloud0...@163.com a écrit : `Column` in `DataFrame` is a general concept. `field1` is a column, `field + 1` is a column, `field1 field2` is also a column. For API like `select`, it should accept `Column` as we need general expressions. But for `drop`, we can only drop exist columns which is not general expression. So I think it makes sense to only allow String in `drop` as column name. At 2015-05-31 02:41:52, Reynold Xin r...@databricks.com wrote: Name resolution is not as easy I think. Wenchen can maybe give you some advice on resolution about this one. On Sat, May 30, 2015 at 9:37 AM, Yijie Shen henry.yijies...@gmail.com wrote: I think just match the Column’s expr as UnresolvedAttribute and use UnresolvedAttribute’s name to match schema’s field name is enough. Seems no need to regard expr as a more general one. :) On May 30, 2015 at 11:14:05 PM, Girardot Olivier ( o.girar...@lateral-thoughts.com) wrote: Jira done : https://issues.apache.org/jira/browse/SPARK-7969 I've already started working on it but it's less trivial than it seems because I don't exactly now the inner workings of the catalog, and how to get the qualified name of a column to match it against the schema/catalog. Regards, Olivier. Le sam. 30 mai 2015 à 09:54, Reynold Xin r...@databricks.com a écrit : Yea would be great to support a Column. Can you create a JIRA, and possibly a pull request? On Fri, May 29, 2015 at 2:45 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Actually, the Scala API too is only based on column name Le ven. 29 mai 2015 à 11:23, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi, Testing a bit more 1.4, it seems that the .drop() method in PySpark doesn't seem to accept a Column as input datatype : *.join(only_the_best, only_the_best.pol_no == df.pol_no, inner).drop(only_the_best.pol_no)\* File /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line 1225, in drop jdf = self._jdf.drop(colName) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 523, in __call__ (new_args, temp_args) = self._get_args(args) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 510, in _get_args temp_arg = converter.convert(arg, self.gateway_client) File /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line 490, in convert for key in object.keys(): TypeError: 'Column' object is not callable It doesn't seem very consistent with rest of the APIs - and is especially annoying when executing joins - because drop(my_key) is not a qualified reference to the column. What do you think about changing that ? or what is the best practice as a workaround ? Regards, Olivier.
Re: Dataframe's .drop in PySpark doesn't accept Column
Jira done : https://issues.apache.org/jira/browse/SPARK-7969 I've already started working on it but it's less trivial than it seems because I don't exactly now the inner workings of the catalog, and how to get the qualified name of a column to match it against the schema/catalog. Regards, Olivier. Le sam. 30 mai 2015 à 09:54, Reynold Xin r...@databricks.com a écrit : Yea would be great to support a Column. Can you create a JIRA, and possibly a pull request? On Fri, May 29, 2015 at 2:45 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Actually, the Scala API too is only based on column name Le ven. 29 mai 2015 à 11:23, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi, Testing a bit more 1.4, it seems that the .drop() method in PySpark doesn't seem to accept a Column as input datatype : *.join(only_the_best, only_the_best.pol_no == df.pol_no, inner).drop(only_the_best.pol_no)\* File /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line 1225, in drop jdf = self._jdf.drop(colName) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 523, in __call__ (new_args, temp_args) = self._get_args(args) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 510, in _get_args temp_arg = converter.convert(arg, self.gateway_client) File /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line 490, in convert for key in object.keys(): TypeError: 'Column' object is not callable It doesn't seem very consistent with rest of the APIs - and is especially annoying when executing joins - because drop(my_key) is not a qualified reference to the column. What do you think about changing that ? or what is the best practice as a workaround ? Regards, Olivier.
Re: Dataframe's .drop in PySpark doesn't accept Column
Actually, the Scala API too is only based on column name Le ven. 29 mai 2015 à 11:23, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi, Testing a bit more 1.4, it seems that the .drop() method in PySpark doesn't seem to accept a Column as input datatype : *.join(only_the_best, only_the_best.pol_no == df.pol_no, inner).drop(only_the_best.pol_no)\* File /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line 1225, in drop jdf = self._jdf.drop(colName) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 523, in __call__ (new_args, temp_args) = self._get_args(args) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 510, in _get_args temp_arg = converter.convert(arg, self.gateway_client) File /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line 490, in convert for key in object.keys(): TypeError: 'Column' object is not callable It doesn't seem very consistent with rest of the APIs - and is especially annoying when executing joins - because drop(my_key) is not a qualified reference to the column. What do you think about changing that ? or what is the best practice as a workaround ? Regards, Olivier.
Dataframe's .drop in PySpark doesn't accept Column
Hi, Testing a bit more 1.4, it seems that the .drop() method in PySpark doesn't seem to accept a Column as input datatype : *.join(only_the_best, only_the_best.pol_no == df.pol_no, inner).drop(only_the_best.pol_no)\* File /usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.py, line 1225, in drop jdf = self._jdf.drop(colName) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 523, in __call__ (new_args, temp_args) = self._get_args(args) File /usr/local/lib/python2.7/site-packages/py4j/java_gateway.py, line 510, in _get_args temp_arg = converter.convert(arg, self.gateway_client) File /usr/local/lib/python2.7/site-packages/py4j/java_collections.py, line 490, in convert for key in object.keys(): TypeError: 'Column' object is not callable It doesn't seem very consistent with rest of the APIs - and is especially annoying when executing joins - because drop(my_key) is not a qualified reference to the column. What do you think about changing that ? or what is the best practice as a workaround ? Regards, Olivier.
Re: [VOTE] Release Apache Spark 1.4.0 (RC2)
I've just tested the new window functions using PySpark in the Spark 1.4.0 rc2 distribution for hadoop 2.4 with and without hive support. It works well with the hive support enabled distribution and fails as expected on the other one (with an explicit error : Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext). Thank you for your work. Regards, Olivier. Le lun. 25 mai 2015 à 11:25, Wang, Daoyuan daoyuan.w...@intel.com a écrit : Good catch! BTW, SPARK-6784 is duplicate to SPAKR-7790, didn't notice we changed the title of SPARK-7853.. -Original Message- From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Monday, May 25, 2015 4:47 PM To: Sean Owen; Patrick Wendell Cc: dev@spark.apache.org Subject: RE: [VOTE] Release Apache Spark 1.4.0 (RC2) Add another Blocker issue, just created! It seems a regression. https://issues.apache.org/jira/browse/SPARK-7853 -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, May 25, 2015 3:37 PM To: Patrick Wendell Cc: dev@spark.apache.org Subject: Re: [VOTE] Release Apache Spark 1.4.0 (RC2) We still have 1 blocker for 1.4: SPARK-6784 Make sure values of partitioning columns are correctly converted based on their data types CC Davies Liu / Adrian Wang to check on the status of this. There are still 50 Critical issues tagged for 1.4, and 183 issues targeted for 1.4 in general. Obviously almost all of those won't be in 1.4. How do people want to deal with those? The field can be cleared, but do people want to take a pass at bumping a few to 1.4.1 that really truly are supposed to go into 1.4.1? On Sun, May 24, 2015 at 8:22 AM, Patrick Wendell pwend...@gmail.com wrote: Please vote on releasing the following candidate as Apache Spark version 1.4.0! The tag to be voted on is v1.4.0-rc2 (commit 03fb26a3): https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=03fb26a 3e50e00739cc815ba4e2e82d71d003168 The release files, including signatures, digests, etc. can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc2-bin/ Release artifacts are signed with the following key: https://people.apache.org/keys/committer/pwendell.asc The staging repository for this release can be found at: [published as version: 1.4.0] https://repository.apache.org/content/repositories/orgapachespark-1103 / [published as version: 1.4.0-rc2] https://repository.apache.org/content/repositories/orgapachespark-1104 / The documentation corresponding to this release can be found at: http://people.apache.org/~pwendell/spark-releases/spark-1.4.0-rc2-docs / Please vote on releasing this package as Apache Spark 1.4.0! The vote is open until Wednesday, May 27, at 08:12 UTC and passes if a majority of at least 3 +1 PMC votes are cast. [ ] +1 Release this package as Apache Spark 1.4.0 [ ] -1 Do not release this package because ... To learn more about Apache Spark, please see http://spark.apache.org/ == What has changed since RC1 == Below is a list of bug fixes that went into this RC: http://s.apache.org/U1M == How can I help test this release? == If you are a Spark user, you can help us test this release by taking a Spark 1.3 workload and running on this release candidate, then reporting any regressions. == What justifies a -1 vote for this release? == This vote is happening towards the end of the 1.4 QA period, so -1 votes should only occur for significant regressions from 1.3.1. Bugs already present in 1.3.X, minor regressions, or bugs related to new features will not block this release. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: NoClassDefFoundError with Spark 1.3
You're trying to launch using sbt run some provided dependency, the goal of the provided scope is exactly to exclude this dependency from runtime, considering it as provided by the environment. You configuration is correct to create an assembly jar - but not to use sbt run to test your project. Regards, Olivier. Le ven. 8 mai 2015 à 10:41, Akhil Das ak...@sigmoidanalytics.com a écrit : Looks like the jar you provided has some missing classes. Try this: scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.3.0, org.apache.spark %% spark-sql % 1.3.0 % provided, org.apache.spark %% spark-mllib % 1.3.0 % provided, log4j % log4j % 1.2.15 excludeAll( ExclusionRule(organization = com.sun.jdmk), ExclusionRule(organization = com.sun.jmx), ExclusionRule(organization = javax.jms) ) ) Thanks Best Regards On Thu, May 7, 2015 at 11:28 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi all – I’m attempting to build a project with SBT and run it on Spark 1.3 (this previously worked before we upgraded to CDH 5.4 with Spark 1.3). I have the following in my build.sbt: scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.3.0 % provided, org.apache.spark %% spark-sql % 1.3.0 % provided, org.apache.spark %% spark-mllib % 1.3.0 % provided, log4j % log4j % 1.2.15 excludeAll( ExclusionRule(organization = com.sun.jdmk), ExclusionRule(organization = com.sun.jmx), ExclusionRule(organization = javax.jms) ) ) When I attempt to run this program with sbt run, however, I get the following error: java.lang.NoClassDefFoundError: org.apache.spark.Partitioner I don’t explicitly use the Partitioner class anywhere, and this seems to indicate some missing Spark libraries on the install. Do I need to confirm anything other than the presence of the Spark assembly? I’m on CDH 5.4 and I’m able to run the spark-shell without any trouble. Any help would be much appreciated. Thank you, Ilya Ganelin -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: DataFrame distinct vs RDD distinct
I'll try to reproduce what has been reported to me first :) and I'll let you know. Thanks ! Le jeu. 7 mai 2015 à 21:16, Michael Armbrust mich...@databricks.com a écrit : I'd happily merge a PR that changes the distinct implementation to be more like Spark core, assuming it includes benchmarks that show better performance for both the fits in memory case and the too big for memory case. On Thu, May 7, 2015 at 2:23 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Ok, but for the moment, this seems to be killing performances on some computations... I'll try to give you precise figures on this between rdd and dataframe. Olivier. Le jeu. 7 mai 2015 à 10:08, Reynold Xin r...@databricks.com a écrit : In 1.5, we will most likely just rewrite distinct in SQL to either use the Aggregate operator which will benefit from all the Tungsten optimizations, or have a Tungsten version of distinct for SQL/DataFrame. On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, there seems to be different implementations of the distinct feature in DataFrames and RDD and some performance issue with the DataFrame distinct API. In RDD.scala : def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) } And in DataFrame : case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def requiredChildDistribution: Seq[Distribution] = if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions { iter =** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var currentRow: Row = null** while (iter.hasNext) {** currentRow = iter.next()** if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }** }* * hashSet.iterator** }** }*} I can try to reproduce more clearly the performance issue, but do you have any insights into why we can't have the same distinct strategy between DataFrame and RDD ? Regards, Olivier.
DataFrame distinct vs RDD distinct
Hi everyone, there seems to be different implementations of the distinct feature in DataFrames and RDD and some performance issue with the DataFrame distinct API. In RDD.scala : def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) } And in DataFrame : case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def requiredChildDistribution: Seq[Distribution] = if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions { iter =** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var currentRow: Row = null** while (iter.hasNext) {** currentRow = iter.next()** if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }** }* * hashSet.iterator** }** }*} I can try to reproduce more clearly the performance issue, but do you have any insights into why we can't have the same distinct strategy between DataFrame and RDD ? Regards, Olivier.
Re: DataFrame distinct vs RDD distinct
Ok, but for the moment, this seems to be killing performances on some computations... I'll try to give you precise figures on this between rdd and dataframe. Olivier. Le jeu. 7 mai 2015 à 10:08, Reynold Xin r...@databricks.com a écrit : In 1.5, we will most likely just rewrite distinct in SQL to either use the Aggregate operator which will benefit from all the Tungsten optimizations, or have a Tungsten version of distinct for SQL/DataFrame. On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, there seems to be different implementations of the distinct feature in DataFrames and RDD and some performance issue with the DataFrame distinct API. In RDD.scala : def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) } And in DataFrame : case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def requiredChildDistribution: Seq[Distribution] = if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions { iter =** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var currentRow: Row = null** while (iter.hasNext) {** currentRow = iter.next()** if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }** }* * hashSet.iterator** }** }*} I can try to reproduce more clearly the performance issue, but do you have any insights into why we can't have the same distinct strategy between DataFrame and RDD ? Regards, Olivier.
Re: Multi-Line JSON in SparkSQL
I was wondering if it's possible to use existing Hive SerDes for this ? Le lun. 4 mai 2015 à 08:36, Joe Halliwell joe.halliw...@gmail.com a écrit : I think Reynold’s argument shows the impossibility of the general case. But a “maximum object depth” hint could enable a new input format to do its job both efficiently and correctly in the common case where the input is an array of similarly structured objects! I’d certainly be interested in an implementation along those lines. Cheers, Joe http://www.joehalliwell.com @joehalliwell On Mon, May 4, 2015 at 7:55 AM, Reynold Xin r...@databricks.com wrote: I took a quick look at that implementation. I'm not sure if it actually handles JSON correctly, because it attempts to find the first { starting from a random point. However, that random point could be in the middle of a string, and thus the first { might just be part of a string, rather than a real JSON object starting position. On Sun, May 3, 2015 at 11:13 PM, Emre Sevinc emre.sev...@gmail.com wrote: You can check out the following library: https://github.com/alexholmes/json-mapreduce -- Emre Sevinç On Sun, May 3, 2015 at 10:04 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way in Spark SQL to load multi-line JSON data efficiently, I think there was in the mailing list a reference to http://pivotal-field-engineering.github.io/pmr-common/ for its JSONInputFormat But it's rather inaccessible considering the dependency is not available in any public maven repo (If you know of one, I'd be glad to hear it). Is there any plan to address this or any public recommendation ? (considering the documentation clearly states that sqlContext.jsonFile will not work for multi-line json(s)) Regards, Olivier. -- Emre Sevinc
Re: Multi-Line JSON in SparkSQL
@joe, I'd be glad to help if you need. Le lun. 4 mai 2015 à 20:06, Matei Zaharia matei.zaha...@gmail.com a écrit : I don't know whether this is common, but we might also allow another separator for JSON objects, such as two blank lines. Matei On May 4, 2015, at 2:28 PM, Reynold Xin r...@databricks.com wrote: Joe - I think that's a legit and useful thing to do. Do you want to give it a shot? On Mon, May 4, 2015 at 12:36 AM, Joe Halliwell joe.halliw...@gmail.com wrote: I think Reynold’s argument shows the impossibility of the general case. But a “maximum object depth” hint could enable a new input format to do its job both efficiently and correctly in the common case where the input is an array of similarly structured objects! I’d certainly be interested in an implementation along those lines. Cheers, Joe http://www.joehalliwell.com @joehalliwell On Mon, May 4, 2015 at 7:55 AM, Reynold Xin r...@databricks.com wrote: I took a quick look at that implementation. I'm not sure if it actually handles JSON correctly, because it attempts to find the first { starting from a random point. However, that random point could be in the middle of a string, and thus the first { might just be part of a string, rather than a real JSON object starting position. On Sun, May 3, 2015 at 11:13 PM, Emre Sevinc emre.sev...@gmail.com wrote: You can check out the following library: https://github.com/alexholmes/json-mapreduce -- Emre Sevinç On Sun, May 3, 2015 at 10:04 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way in Spark SQL to load multi-line JSON data efficiently, I think there was in the mailing list a reference to http://pivotal-field-engineering.github.io/pmr-common/ for its JSONInputFormat But it's rather inaccessible considering the dependency is not available in any public maven repo (If you know of one, I'd be glad to hear it). Is there any plan to address this or any public recommendation ? (considering the documentation clearly states that sqlContext.jsonFile will not work for multi-line json(s)) Regards, Olivier. -- Emre Sevinc
Re: Multi-Line JSON in SparkSQL
I'll try to study that and get back to you. Regards, Olivier. Le lun. 4 mai 2015 à 04:05, Reynold Xin r...@databricks.com a écrit : How does the pivotal format decides where to split the files? It seems to me the challenge is to decide that, and on the top of my head the only way to do this is to scan from the beginning and parse the json properly, which makes it not possible with large files (doable for whole input with a lot of small files though). If there is a better way, we should do it. On Sun, May 3, 2015 at 1:04 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way in Spark SQL to load multi-line JSON data efficiently, I think there was in the mailing list a reference to http://pivotal-field-engineering.github.io/pmr-common/ for its JSONInputFormat But it's rather inaccessible considering the dependency is not available in any public maven repo (If you know of one, I'd be glad to hear it). Is there any plan to address this or any public recommendation ? (considering the documentation clearly states that sqlContext.jsonFile will not work for multi-line json(s)) Regards, Olivier.
Re: createDataFrame allows column names as second param in Python not in Scala
I have the perfect counter example where some of the data scientists prototype in Python and the production materials is done in Scala. But I get your point, as a matter of fact I realised the toDF method took parameters a little while after posting this. However the toDF still needs you to go from a List to an RDD, or create a useless Dataframe and call toDF on it re-creating a complete data structure. I just feel that the createDataFrame(_: Seq) is not really useful as it is, because I think there are practically no circumstances where you'd want to create a DataFrame without column names. I'm not implying a n-th overloaded method should be created, rather than change the signature of the existing method with an optional Seq of column names. Regards, Olivier. Le dim. 3 mai 2015 à 07:44, Reynold Xin r...@databricks.com a écrit : Part of the reason is that it is really easy to just call toDF on Scala, and we already have a lot of createDataFrame functions. (You might find some of the cross-language differences confusing, but I'd argue most real users just stick to one language, and developers or trainers are the only ones that need to constantly switch between languages). On Sat, May 2, 2015 at 11:05 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, SQLContext.createDataFrame has different behaviour in Scala or Python : l = [('Alice', 1)] sqlContext.createDataFrame(l).collect() [Row(_1=u'Alice', _2=1)] sqlContext.createDataFrame(l, ['name', 'age']).collect() [Row(name=u'Alice', age=1)] and in Scala : scala val data = List((Alice, 1), (Wonderland, 0)) scala sqlContext.createDataFrame(data, List(name, score)) console:28: error: overloaded method value createDataFrame with alternatives: ... cannot be applied to ... What do you think about allowing in Scala too to have a Seq of column names for the sake of consistency ? Regards, Olivier.
Multi-Line JSON in SparkSQL
Hi everyone, Is there any way in Spark SQL to load multi-line JSON data efficiently, I think there was in the mailing list a reference to http://pivotal-field-engineering.github.io/pmr-common/ for its JSONInputFormat But it's rather inaccessible considering the dependency is not available in any public maven repo (If you know of one, I'd be glad to hear it). Is there any plan to address this or any public recommendation ? (considering the documentation clearly states that sqlContext.jsonFile will not work for multi-line json(s)) Regards, Olivier.
createDataFrame allows column names as second param in Python not in Scala
Hi everyone, SQLContext.createDataFrame has different behaviour in Scala or Python : l = [('Alice', 1)] sqlContext.createDataFrame(l).collect() [Row(_1=u'Alice', _2=1)] sqlContext.createDataFrame(l, ['name', 'age']).collect() [Row(name=u'Alice', age=1)] and in Scala : scala val data = List((Alice, 1), (Wonderland, 0)) scala sqlContext.createDataFrame(data, List(name, score)) console:28: error: overloaded method value createDataFrame with alternatives: ... cannot be applied to ... What do you think about allowing in Scala too to have a Seq of column names for the sake of consistency ? Regards, Olivier.
Re: Pandas' Shift in Dataframe
To close this thread rxin created a broader Jira to handle window functions in Dataframes : https://issues.apache.org/jira/browse/SPARK-7322 Thanks everyone. Le mer. 29 avr. 2015 à 22:51, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : To give you a broader idea of the current use case, I have a few transformations (sort and column creations) oriented towards a simple goal. My data is timestamped and if two lines are identical, that time difference will have to be more than X days in order to be kept, so there are a few shifts done but very locally : only -1 or +1. FYI regarding JIRA, i created one - https://issues.apache.org/jira/browse/SPARK-7247 - associated to this discussion. @rxin considering, in my use case, the data is sorted beforehand, there might be a better way - but I guess some shuffle would needed anyway... Le mer. 29 avr. 2015 à 22:34, Evan R. Sparks evan.spa...@gmail.com a écrit : In general there's a tension between ordered data and set-oriented data model underlying DataFrames. You can force a total ordering on the data, but it may come at a high cost with respect to performance. It would be good to get a sense of the use case you're trying to support, but one suggestion would be to apply I can imagine achieving a similar result by applying a datetime.timedelta (in Python terms) to a time attribute (your axis) and then performing join between the base table and this derived table to merge the data back together. This type of join could then be optimized if the use case is frequent enough to warrant it. - Evan On Wed, Apr 29, 2015 at 1:25 PM, Reynold Xin r...@databricks.com wrote: In this case it's fine to discuss whether this would fit in Spark DataFrames' high level direction before putting it in JIRA. Otherwise we might end up creating a lot of tickets just for querying whether something might be a good idea. About this specific feature -- I'm not sure what it means in general given we don't have axis in Spark DataFrames. But I think it'd probably be good to be able to shift a column by one so we can support the end time / begin time case, although it'd require two passes over the data. On Wed, Apr 29, 2015 at 1:08 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I can't comment on the direction of the DataFrame API (that's more for Reynold or Michael I guess), but I just wanted to point out that the JIRA would be the recommended way to create a central place for discussing a feature add like that. Nick On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi Nicholas, yes I've already checked, and I've just created the https://issues.apache.org/jira/browse/SPARK-7247 I'm not even sure why this would be a good feature to add except the fact that some of the data scientists I'm working with are using it, and it would be therefore useful for me to translate Pandas code to Spark... Isn't the goal of Spark Dataframe to allow all the features of Pandas/R Dataframe using Spark ? Regards, Olivier. Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas nicholas.cham...@gmail.com a écrit : You can check JIRA for any existing plans. If there isn't any, then feel free to create a JIRA and make the case there for why this would be a good feature to add. Nick On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the shift method from Pandas to Spark Dataframe, not that I think it's an easy task... c.f. http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html Regards, Olivier.
Re: Pandas' Shift in Dataframe
To give you a broader idea of the current use case, I have a few transformations (sort and column creations) oriented towards a simple goal. My data is timestamped and if two lines are identical, that time difference will have to be more than X days in order to be kept, so there are a few shifts done but very locally : only -1 or +1. FYI regarding JIRA, i created one - https://issues.apache.org/jira/browse/SPARK-7247 - associated to this discussion. @rxin considering, in my use case, the data is sorted beforehand, there might be a better way - but I guess some shuffle would needed anyway... Le mer. 29 avr. 2015 à 22:34, Evan R. Sparks evan.spa...@gmail.com a écrit : In general there's a tension between ordered data and set-oriented data model underlying DataFrames. You can force a total ordering on the data, but it may come at a high cost with respect to performance. It would be good to get a sense of the use case you're trying to support, but one suggestion would be to apply I can imagine achieving a similar result by applying a datetime.timedelta (in Python terms) to a time attribute (your axis) and then performing join between the base table and this derived table to merge the data back together. This type of join could then be optimized if the use case is frequent enough to warrant it. - Evan On Wed, Apr 29, 2015 at 1:25 PM, Reynold Xin r...@databricks.com wrote: In this case it's fine to discuss whether this would fit in Spark DataFrames' high level direction before putting it in JIRA. Otherwise we might end up creating a lot of tickets just for querying whether something might be a good idea. About this specific feature -- I'm not sure what it means in general given we don't have axis in Spark DataFrames. But I think it'd probably be good to be able to shift a column by one so we can support the end time / begin time case, although it'd require two passes over the data. On Wed, Apr 29, 2015 at 1:08 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I can't comment on the direction of the DataFrame API (that's more for Reynold or Michael I guess), but I just wanted to point out that the JIRA would be the recommended way to create a central place for discussing a feature add like that. Nick On Wed, Apr 29, 2015 at 3:43 PM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi Nicholas, yes I've already checked, and I've just created the https://issues.apache.org/jira/browse/SPARK-7247 I'm not even sure why this would be a good feature to add except the fact that some of the data scientists I'm working with are using it, and it would be therefore useful for me to translate Pandas code to Spark... Isn't the goal of Spark Dataframe to allow all the features of Pandas/R Dataframe using Spark ? Regards, Olivier. Le mer. 29 avr. 2015 à 21:09, Nicholas Chammas nicholas.cham...@gmail.com a écrit : You can check JIRA for any existing plans. If there isn't any, then feel free to create a JIRA and make the case there for why this would be a good feature to add. Nick On Wed, Apr 29, 2015 at 7:30 AM Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, Is there any plan to add the shift method from Pandas to Spark Dataframe, not that I think it's an easy task... c.f. http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.shift.html Regards, Olivier.
Re: Spark SQL cannot tolerate regexp with BIGINT
I guess you can use cast(id as String) instead of just id in your where clause ? Le mer. 29 avr. 2015 à 12:13, lonely Feb lonely8...@gmail.com a écrit : Hi all, we are transfer our HIVE job into SparkSQL, but we found a litter difference between HIVE and Spark SQL that our sql has a statement like: select A from B where id regexp '^12345$' in HIVE it works fine but in Spark SQL we got a: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String Can this statement be handled with Spark SQL?
Re: Dataframe.fillna from 1.3.0
done : https://github.com/apache/spark/pull/5683 and https://issues.apache.org/jira/browse/SPARK-7118 thx Le ven. 24 avr. 2015 à 07:34, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : I'll try thanks Le ven. 24 avr. 2015 à 00:09, Reynold Xin r...@databricks.com a écrit : You can do it similar to the way countDistinct is done, can't you? https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py#L78 On Thu, Apr 23, 2015 at 1:59 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: I found another way setting a SPARK_HOME on a released version and launching an ipython to load the contexts. I may need your insight however, I found why it hasn't been done at the same time, this method (like some others) uses a varargs in Scala and for now the way functions are called only one parameter is supported. So at first I tried to just generalise the helper function _ in the functions.py file to multiple arguments, but py4j's handling of varargs forces me to create an Array[Column] if the target method is expecting varargs. But from Python's perspective, we have no idea of whether the target method will be expecting varargs or just multiple arguments (to un-tuple). I can create a special case for coalesce or for method that takes of list of columns as arguments considering they will be varargs based (and therefore needs an Array[Column] instead of just a list of arguments) But this seems very specific and very prone to future mistakes. Is there any way in Py4j to know before calling it the signature of a method ? Le jeu. 23 avr. 2015 à 22:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : What is the way of testing/building the pyspark part of Spark ? Le jeu. 23 avr. 2015 à 22:06, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : yep :) I'll open the jira when I've got the time. Thanks Le jeu. 23 avr. 2015 à 19:31, Reynold Xin r...@databricks.com a écrit : Ah damn. We need to add it to the Python list. Would you like to give it a shot? On Thu, Apr 23, 2015 at 4:31 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yep no problem, but I can't seem to find the coalesce fonction in pyspark.sql.{*, functions, types or whatever :) } Olivier. Le lun. 20 avr. 2015 à 11:48, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : a UDF might be a good idea no ? Le lun. 20 avr. 2015 à 11:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Re: Dataframe.fillna from 1.3.0
yep :) I'll open the jira when I've got the time. Thanks Le jeu. 23 avr. 2015 à 19:31, Reynold Xin r...@databricks.com a écrit : Ah damn. We need to add it to the Python list. Would you like to give it a shot? On Thu, Apr 23, 2015 at 4:31 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yep no problem, but I can't seem to find the coalesce fonction in pyspark.sql.{*, functions, types or whatever :) } Olivier. Le lun. 20 avr. 2015 à 11:48, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : a UDF might be a good idea no ? Le lun. 20 avr. 2015 à 11:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Re: Dataframe.fillna from 1.3.0
What is the way of testing/building the pyspark part of Spark ? Le jeu. 23 avr. 2015 à 22:06, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : yep :) I'll open the jira when I've got the time. Thanks Le jeu. 23 avr. 2015 à 19:31, Reynold Xin r...@databricks.com a écrit : Ah damn. We need to add it to the Python list. Would you like to give it a shot? On Thu, Apr 23, 2015 at 4:31 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yep no problem, but I can't seem to find the coalesce fonction in pyspark.sql.{*, functions, types or whatever :) } Olivier. Le lun. 20 avr. 2015 à 11:48, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : a UDF might be a good idea no ? Le lun. 20 avr. 2015 à 11:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Re: Dataframe.fillna from 1.3.0
I found another way setting a SPARK_HOME on a released version and launching an ipython to load the contexts. I may need your insight however, I found why it hasn't been done at the same time, this method (like some others) uses a varargs in Scala and for now the way functions are called only one parameter is supported. So at first I tried to just generalise the helper function _ in the functions.py file to multiple arguments, but py4j's handling of varargs forces me to create an Array[Column] if the target method is expecting varargs. But from Python's perspective, we have no idea of whether the target method will be expecting varargs or just multiple arguments (to un-tuple). I can create a special case for coalesce or for method that takes of list of columns as arguments considering they will be varargs based (and therefore needs an Array[Column] instead of just a list of arguments) But this seems very specific and very prone to future mistakes. Is there any way in Py4j to know before calling it the signature of a method ? Le jeu. 23 avr. 2015 à 22:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : What is the way of testing/building the pyspark part of Spark ? Le jeu. 23 avr. 2015 à 22:06, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : yep :) I'll open the jira when I've got the time. Thanks Le jeu. 23 avr. 2015 à 19:31, Reynold Xin r...@databricks.com a écrit : Ah damn. We need to add it to the Python list. Would you like to give it a shot? On Thu, Apr 23, 2015 at 4:31 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yep no problem, but I can't seem to find the coalesce fonction in pyspark.sql.{*, functions, types or whatever :) } Olivier. Le lun. 20 avr. 2015 à 11:48, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : a UDF might be a good idea no ? Le lun. 20 avr. 2015 à 11:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Re: Dataframe.fillna from 1.3.0
I'll try thanks Le ven. 24 avr. 2015 à 00:09, Reynold Xin r...@databricks.com a écrit : You can do it similar to the way countDistinct is done, can't you? https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py#L78 On Thu, Apr 23, 2015 at 1:59 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: I found another way setting a SPARK_HOME on a released version and launching an ipython to load the contexts. I may need your insight however, I found why it hasn't been done at the same time, this method (like some others) uses a varargs in Scala and for now the way functions are called only one parameter is supported. So at first I tried to just generalise the helper function _ in the functions.py file to multiple arguments, but py4j's handling of varargs forces me to create an Array[Column] if the target method is expecting varargs. But from Python's perspective, we have no idea of whether the target method will be expecting varargs or just multiple arguments (to un-tuple). I can create a special case for coalesce or for method that takes of list of columns as arguments considering they will be varargs based (and therefore needs an Array[Column] instead of just a list of arguments) But this seems very specific and very prone to future mistakes. Is there any way in Py4j to know before calling it the signature of a method ? Le jeu. 23 avr. 2015 à 22:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : What is the way of testing/building the pyspark part of Spark ? Le jeu. 23 avr. 2015 à 22:06, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : yep :) I'll open the jira when I've got the time. Thanks Le jeu. 23 avr. 2015 à 19:31, Reynold Xin r...@databricks.com a écrit : Ah damn. We need to add it to the Python list. Would you like to give it a shot? On Thu, Apr 23, 2015 at 4:31 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yep no problem, but I can't seem to find the coalesce fonction in pyspark.sql.{*, functions, types or whatever :) } Olivier. Le lun. 20 avr. 2015 à 11:48, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : a UDF might be a good idea no ? Le lun. 20 avr. 2015 à 11:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Re: Dataframe.fillna from 1.3.0
Where should this *coalesce* come from ? Is it related to the partition manipulation coalesce method ? Thanks ! Le lun. 20 avr. 2015 à 22:48, Reynold Xin r...@databricks.com a écrit : Ah ic. You can do something like df.select(coalesce(df(a), lit(0.0))) On Mon, Apr 20, 2015 at 1:44 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: From PySpark it seems to me that the fillna is relying on Java/Scala code, that's why I was wondering. Thank you for answering :) Le lun. 20 avr. 2015 à 22:22, Reynold Xin r...@databricks.com a écrit : You can just create fillna function based on the 1.3.1 implementation of fillna, no? On Mon, Apr 20, 2015 at 2:48 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: a UDF might be a good idea no ? Le lun. 20 avr. 2015 à 11:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Re: Dataframe.fillna from 1.3.0
I think I found the Coalesce you were talking about, but this is a catalyst class that I think is not available from pyspark Regards, Olivier. Le mer. 22 avr. 2015 à 11:56, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Where should this *coalesce* come from ? Is it related to the partition manipulation coalesce method ? Thanks ! Le lun. 20 avr. 2015 à 22:48, Reynold Xin r...@databricks.com a écrit : Ah ic. You can do something like df.select(coalesce(df(a), lit(0.0))) On Mon, Apr 20, 2015 at 1:44 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: From PySpark it seems to me that the fillna is relying on Java/Scala code, that's why I was wondering. Thank you for answering :) Le lun. 20 avr. 2015 à 22:22, Reynold Xin r...@databricks.com a écrit : You can just create fillna function based on the 1.3.1 implementation of fillna, no? On Mon, Apr 20, 2015 at 2:48 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: a UDF might be a good idea no ? Le lun. 20 avr. 2015 à 11:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Re: Spark build time
I agree, it's what I did :) I was just wondering if it was considered a problem or something to work on, I personally think so because the feedback loop should be as quick as possible, and therefore if there was someone I could help. Le mar. 21 avr. 2015 à 22:20, Reynold Xin r...@databricks.com a écrit : It runs tons of integration tests. I think most developers just let Jenkins run the full suite of them. On Tue, Apr 21, 2015 at 12:54 PM, Olivier Girardot ssab...@gmail.com wrote: Hi everyone, I was just wandering about the Spark full build time (including tests), 1h48 seems to me quite... spacious. What's taking most of the time ? Is the build mainly integration tests ? Is there any roadmap or jiras dedicated to that we can chip in ? Regards, Olivier.
Spark 1.2.2 prebuilt release for Hadoop 2.4 didn't get deployed
Hi everyone, It seems the some of the Spark 1.2.2 prebuilt versions (I tested mainly for Hadoop 2.4 and later) didn't get deploy on all the mirrors and cloudfront. Both the direct download and apache mirrors fails with dead links, for example : http://d3kbcqa49mib13.cloudfront.net/spark-1.2.2-bin-hadoop2.4.tgz Regards, Olivier.
Re: Spark 1.2.2 prebuilt release for Hadoop 2.4 didn't get deployed
Thanks Patrick ! I'll update https://registry.hub.docker.com/u/ogirardot/spark-docker-shell/ when you're done. Regards, Olivier. Le mar. 21 avr. 2015 à 20:47, Patrick Wendell pwend...@gmail.com a écrit : Good catch Olivier - I'll take care of it. Tracking this on SPARK-7027. On Tue, Apr 21, 2015 at 6:06 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, It seems the some of the Spark 1.2.2 prebuilt versions (I tested mainly for Hadoop 2.4 and later) didn't get deploy on all the mirrors and cloudfront. Both the direct download and apache mirrors fails with dead links, for example : http://d3kbcqa49mib13.cloudfront.net/spark-1.2.2-bin-hadoop2.4.tgz Regards, Olivier.
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
Hi Sourav, Can you post your updateFunc as well please ? Regards, Olivier. Le mar. 21 avr. 2015 à 12:48, Sourav Chandra sourav.chan...@livestream.com a écrit : Hi, We are building a spark streaming application which reads from kafka, does updateStateBykey based on the received message type and finally stores into redis. After running for few seconds the executor process get killed by throwing OutOfMemory error. The code snippet is below: *NoOfReceiverInstances = 1* *val kafkaStreams = (1 to NoOfReceiverInstances).map(* * _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* *)* *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {...}* *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* *object RedisHelper {* * private val client = scredis.Redis(* * ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* * )* * def update(**itr: Iterator[(String, (Long, Long))]) {* *// redis save operation* * }* *}* *Below is the spark configuration:* *spark.app.name http://spark.app.name = XXX* *spark.jars = .jar* *spark.home = /spark-1.1.1-bin-hadoop2.4* *spark.executor.memory = 1g* *spark.streaming.concurrentJobs = 1000* *spark.logConf = true* *spark.cleaner.ttl = 3600 //in milliseconds* *spark.default.parallelism = 12* *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof -XX:+HeapDumpOnOutOfMemoryError* *spark.executor.logs.rolling.strategy = size* *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* *spark.executor.logs.rolling.maxRetainedFiles = 10* *spark.serializer = org.apache.spark.serializer.KryoSerializer* *spark.kryo.registrator = xxx.NoOpKryoRegistrator* other configurations are below *streaming {* *// All streaming context related configs should come here* *batch-duration = 1 second* *checkpoint-directory = /tmp* *checkpoint-duration = 10 seconds* *slide-duration = 1 second* *window-duration = 1 second* *partitions-for-shuffle-task = 32* * }* * kafka {* *no-of-receivers = 1* *zookeeper-quorum = :2181* *consumer-group = x* *topic = x:2* * }* We tried different combinations like - with spark 1.1.0 and 1.1.1. - by increasing executor memory - by changing the serialization strategy (switching between kryo and normal java) - by changing broadcast strategy (switching between http and torrent broadcast) Can anyone give any insight what we are missing here? How can we fix this? Due to akka version mismatch with some other libraries we cannot upgrade the spark version. Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Spark build time
Hi everyone, I was just wandering about the Spark full build time (including tests), 1h48 seems to me quite... spacious. What's taking most of the time ? Is the build mainly integration tests ? Is there any roadmap or jiras dedicated to that we can chip in ? Regards, Olivier.
Re: Dataframe.fillna from 1.3.0
a UDF might be a good idea no ? Le lun. 20 avr. 2015 à 11:17, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Dataframe.fillna from 1.3.0
Hi everyone, let's assume I'm stuck in 1.3.0, how can I benefit from the *fillna* API in PySpark, is there any efficient alternative to mapping the records myself ? Regards, Olivier.
Re: BUG: 1.3.0 org.apache.spark.sql.Row Does not exist in Java API
Hi Nipun, you're right, I created the pull request fixing the documentation: https://github.com/apache/spark/pull/5569 and the corresponding issue: https://issues.apache.org/jira/browse/SPARK-6992 Thank you for your time, Olivier. Le sam. 18 avr. 2015 à 01:11, Nipun Batra batrani...@gmail.com a écrit : Hi Oliver Thank you for responding. I am able to find org.apache.spark.sql.Row in spark-catalyst_2.10-1.3.0, BUT it was not visible in API document yesterday ( https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/package-frame.html). I am pretty sure. Also I think this document needs to be changed ' https://spark.apache.org/docs/latest/sql-programming-guide.html' return Row.create(fields[0], fields[1].trim()); needs to be replaced with RowFactory.create. Thanks again for your reponse. Thanks Nipun Batra On Fri, Apr 17, 2015 at 2:50 PM, Olivier Girardot ssab...@gmail.com wrote: Hi Nipun, I'm sorry but I don't understand exactly what your problem is ? Regarding the org.apache.spark.sql.Row, it does exists in the Spark SQL dependency. Is it a compilation problem ? Are you trying to run a main method using the pom you've just described ? or are you trying to spark-submit the jar ? If you're trying to run a main method, the scope provided is not designed for that and will make your program fail. Regards, Olivier. Le ven. 17 avr. 2015 à 21:52, Nipun Batra bni...@gmail.com a écrit : Hi The example given in SQL document https://spark.apache.org/docs/latest/sql-programming-guide.html org.apache.spark.sql.Row Does not exist in Java API or atleast I was not able to find it. Build Info - Downloaded from spark website Dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.0/version scopeprovided/scope /dependency Code in documentation // Import factory methods provided by DataType.import org.apache.spark.sql.types.DataType;// Import StructType and StructFieldimport org.apache.spark.sql.types.StructType;import org.apache.spark.sql.types.StructField;// Import Row.import org.apache.spark.sql.Row; // sc is an existing JavaSparkContext.SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean.JavaRDDString people = sc.textFile(examples/src/main/resources/people.txt); // The schema is encoded in a stringString schemaString = name age; // Generate the schema based on the string of schemaListStructField fields = new ArrayListStructField();for (String fieldName: schemaString.split( )) { fields.add(DataType.createStructField(fieldName, DataType.StringType, true));}StructType schema = DataType.createStructType(fields); // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = people.map( new FunctionString, Row() { public Row call(String record) throws Exception { String[] fields = record.split(,); return Row.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD.DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table.peopleDataFrame.registerTempTable(people); // SQL can be run over RDDs that have been registered as tables.DataFrame results = sqlContext.sql(SELECT name FROM people); // The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.ListString names = results.map(new FunctionRow, String() { public String call(Row row) { return Name: + row.getString(0); } }).collect(); Thanks Nipun
[Spark SQL] Java map/flatMap api broken with DataFrame in 1.3.{0,1}
Hi everyone, I had an issue trying to use Spark SQL from Java (8 or 7), I tried to reproduce it in a small test case close to the actual documentation https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, so sorry for the long mail, but this is Java : import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; class Movie implements Serializable { private int id; private String name; public Movie(int id, String name) { this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } public class SparkSQLTest { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(My Application); conf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); ArrayListMovie movieArrayList = new ArrayListMovie(); movieArrayList.add(new Movie(1, Indiana Jones)); JavaRDDMovie movies = sc.parallelize(movieArrayList); SQLContext sqlContext = new SQLContext(sc); DataFrame frame = sqlContext.applySchema(movies, Movie.class); frame.registerTempTable(movies); sqlContext.sql(select name from movies) *.map(row - row.getString(0)) // this is what i would expect to work *.collect(); } } But this does not compile, here's the compilation error : [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/MainSQL.java:[37,47] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] *required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR * [ERROR]* found: (row)-Na[...]ng(0) * [ERROR] *reason: cannot infer type-variable(s) R * [ERROR] *(actual and formal argument lists differ in length) * [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/SampleSHit.java:[56,17] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR [ERROR] found: (row)-row[...]ng(0) [ERROR] reason: cannot infer type-variable(s) R [ERROR] (actual and formal argument lists differ in length) [ERROR] - [Help 1] Because in the DataFrame the *map *method is defined as : [image: Images intégrées 1] And once this is translated to bytecode the actual Java signature uses a Function1 and adds a ClassTag parameter. I can try to go around this and use the scala.reflect.ClassTag$ like that : ClassTag$.MODULE$.apply(String.class) To get the second ClassTag parameter right, but then instantiating a java.util.Function or using the Java 8 lambdas fail to work, and if I try to instantiate a proper scala Function1... well this is a world of pain. This is a regression introduced by the 1.3.x DataFrame because JavaSchemaRDD used to be JavaRDDLike but DataFrame's are not (and are not callable with JFunctions), I can open a Jira if you want ? Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: [Spark SQL] Java map/flatMap api broken with DataFrame in 1.3.{0,1}
Yes thanks ! Le ven. 17 avr. 2015 à 16:20, Ted Yu yuzhih...@gmail.com a écrit : The image didn't go through. I think you were referring to: override def map[R: ClassTag](f: Row = R): RDD[R] = rdd.map(f) Cheers On Fri, Apr 17, 2015 at 6:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, I had an issue trying to use Spark SQL from Java (8 or 7), I tried to reproduce it in a small test case close to the actual documentation https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection , so sorry for the long mail, but this is Java : import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; class Movie implements Serializable { private int id; private String name; public Movie(int id, String name) { this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } public class SparkSQLTest { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(My Application); conf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); ArrayListMovie movieArrayList = new ArrayListMovie(); movieArrayList.add(new Movie(1, Indiana Jones)); JavaRDDMovie movies = sc.parallelize(movieArrayList); SQLContext sqlContext = new SQLContext(sc); DataFrame frame = sqlContext.applySchema(movies, Movie.class); frame.registerTempTable(movies); sqlContext.sql(select name from movies) *.map(row - row.getString(0)) // this is what i would expect to work *.collect(); } } But this does not compile, here's the compilation error : [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/MainSQL.java:[37,47] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] *required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR * [ERROR]* found: (row)-Na[...]ng(0) * [ERROR] *reason: cannot infer type-variable(s) R * [ERROR] *(actual and formal argument lists differ in length) * [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/SampleSHit.java:[56,17] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR [ERROR] found: (row)-row[...]ng(0) [ERROR] reason: cannot infer type-variable(s) R [ERROR] (actual and formal argument lists differ in length) [ERROR] - [Help 1] Because in the DataFrame the *map *method is defined as : [image: Images intégrées 1] And once this is translated to bytecode the actual Java signature uses a Function1 and adds a ClassTag parameter. I can try to go around this and use the scala.reflect.ClassTag$ like that : ClassTag$.MODULE$.apply(String.class) To get the second ClassTag parameter right, but then instantiating a java.util.Function or using the Java 8 lambdas fail to work, and if I try to instantiate a proper scala Function1... well this is a world of pain. This is a regression introduced by the 1.3.x DataFrame because JavaSchemaRDD used to be JavaRDDLike but DataFrame's are not (and are not callable with JFunctions), I can open a Jira if you want ? Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: [Spark SQL] Java map/flatMap api broken with DataFrame in 1.3.{0,1}
Ok, do you want me to open a pull request to fix the dedicated documentation ? Le ven. 17 avr. 2015 à 18:14, Reynold Xin r...@databricks.com a écrit : I think in 1.3 and above, you'd need to do .sql(...).javaRDD().map(..) On Fri, Apr 17, 2015 at 9:22 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yes thanks ! Le ven. 17 avr. 2015 à 16:20, Ted Yu yuzhih...@gmail.com a écrit : The image didn't go through. I think you were referring to: override def map[R: ClassTag](f: Row = R): RDD[R] = rdd.map(f) Cheers On Fri, Apr 17, 2015 at 6:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, I had an issue trying to use Spark SQL from Java (8 or 7), I tried to reproduce it in a small test case close to the actual documentation https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection , so sorry for the long mail, but this is Java : import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; class Movie implements Serializable { private int id; private String name; public Movie(int id, String name) { this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } public class SparkSQLTest { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(My Application); conf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); ArrayListMovie movieArrayList = new ArrayListMovie(); movieArrayList.add(new Movie(1, Indiana Jones)); JavaRDDMovie movies = sc.parallelize(movieArrayList); SQLContext sqlContext = new SQLContext(sc); DataFrame frame = sqlContext.applySchema(movies, Movie.class); frame.registerTempTable(movies); sqlContext.sql(select name from movies) *.map(row - row.getString(0)) // this is what i would expect to work *.collect(); } } But this does not compile, here's the compilation error : [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/MainSQL.java:[37,47] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] *required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR * [ERROR]* found: (row)-Na[...]ng(0) * [ERROR] *reason: cannot infer type-variable(s) R * [ERROR] *(actual and formal argument lists differ in length) * [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/SampleSHit.java:[56,17] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR [ERROR] found: (row)-row[...]ng(0) [ERROR] reason: cannot infer type-variable(s) R [ERROR] (actual and formal argument lists differ in length) [ERROR] - [Help 1] Because in the DataFrame the *map *method is defined as : [image: Images intégrées 1] And once this is translated to bytecode the actual Java signature uses a Function1 and adds a ClassTag parameter. I can try to go around this and use the scala.reflect.ClassTag$ like that : ClassTag$.MODULE$.apply(String.class) To get the second ClassTag parameter right, but then instantiating a java.util.Function or using the Java 8 lambdas fail to work, and if I try to instantiate a proper scala Function1... well this is a world of pain. This is a regression introduced by the 1.3.x DataFrame because JavaSchemaRDD used to be JavaRDDLike but DataFrame's are not (and are not callable with JFunctions), I can open a Jira if you want ? Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: BUG: 1.3.0 org.apache.spark.sql.Row Does not exist in Java API
Hi Nipun, I'm sorry but I don't understand exactly what your problem is ? Regarding the org.apache.spark.sql.Row, it does exists in the Spark SQL dependency. Is it a compilation problem ? Are you trying to run a main method using the pom you've just described ? or are you trying to spark-submit the jar ? If you're trying to run a main method, the scope provided is not designed for that and will make your program fail. Regards, Olivier. Le ven. 17 avr. 2015 à 21:52, Nipun Batra bni...@gmail.com a écrit : Hi The example given in SQL document https://spark.apache.org/docs/latest/sql-programming-guide.html org.apache.spark.sql.Row Does not exist in Java API or atleast I was not able to find it. Build Info - Downloaded from spark website Dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.0/version scopeprovided/scope /dependency Code in documentation // Import factory methods provided by DataType.import org.apache.spark.sql.types.DataType;// Import StructType and StructFieldimport org.apache.spark.sql.types.StructType;import org.apache.spark.sql.types.StructField;// Import Row.import org.apache.spark.sql.Row; // sc is an existing JavaSparkContext.SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); // Load a text file and convert each line to a JavaBean.JavaRDDString people = sc.textFile(examples/src/main/resources/people.txt); // The schema is encoded in a stringString schemaString = name age; // Generate the schema based on the string of schemaListStructField fields = new ArrayListStructField();for (String fieldName: schemaString.split( )) { fields.add(DataType.createStructField(fieldName, DataType.StringType, true));}StructType schema = DataType.createStructType(fields); // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD = people.map( new FunctionString, Row() { public Row call(String record) throws Exception { String[] fields = record.split(,); return Row.create(fields[0], fields[1].trim()); } }); // Apply the schema to the RDD.DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); // Register the DataFrame as a table.peopleDataFrame.registerTempTable(people); // SQL can be run over RDDs that have been registered as tables.DataFrame results = sqlContext.sql(SELECT name FROM people); // The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by ordinal.ListString names = results.map(new FunctionRow, String() { public String call(Row row) { return Name: + row.getString(0); } }).collect(); Thanks Nipun
Re: [Spark SQL] Java map/flatMap api broken with DataFrame in 1.3.{0,1}
another PR I guess :) here's the associated Jira https://issues.apache.org/jira/browse/SPARK-6988 Le ven. 17 avr. 2015 à 23:00, Reynold Xin r...@databricks.com a écrit : No there isn't a convention. Although if you want to show java 8, you should also show java 6/7 syntax since there are still more 7 users than 8. On Fri, Apr 17, 2015 at 3:36 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Is there any convention *not* to show java 8 versions in the documentation ? Le ven. 17 avr. 2015 à 21:39, Reynold Xin r...@databricks.com a écrit : Please do! Thanks. On Fri, Apr 17, 2015 at 2:36 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Ok, do you want me to open a pull request to fix the dedicated documentation ? Le ven. 17 avr. 2015 à 18:14, Reynold Xin r...@databricks.com a écrit : I think in 1.3 and above, you'd need to do .sql(...).javaRDD().map(..) On Fri, Apr 17, 2015 at 9:22 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yes thanks ! Le ven. 17 avr. 2015 à 16:20, Ted Yu yuzhih...@gmail.com a écrit : The image didn't go through. I think you were referring to: override def map[R: ClassTag](f: Row = R): RDD[R] = rdd.map(f) Cheers On Fri, Apr 17, 2015 at 6:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, I had an issue trying to use Spark SQL from Java (8 or 7), I tried to reproduce it in a small test case close to the actual documentation https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection , so sorry for the long mail, but this is Java : import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; class Movie implements Serializable { private int id; private String name; public Movie(int id, String name) { this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } public class SparkSQLTest { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(My Application); conf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); ArrayListMovie movieArrayList = new ArrayListMovie(); movieArrayList.add(new Movie(1, Indiana Jones)); JavaRDDMovie movies = sc.parallelize(movieArrayList); SQLContext sqlContext = new SQLContext(sc); DataFrame frame = sqlContext.applySchema(movies, Movie.class); frame.registerTempTable(movies); sqlContext.sql(select name from movies) *.map(row - row.getString(0)) // this is what i would expect to work *.collect(); } } But this does not compile, here's the compilation error : [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/MainSQL.java:[37,47] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] *required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR * [ERROR]* found: (row)-Na[...]ng(0) * [ERROR] *reason: cannot infer type-variable(s) R * [ERROR] *(actual and formal argument lists differ in length) * [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/SampleSHit.java:[56,17] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR [ERROR] found: (row)-row[...]ng(0) [ERROR] reason: cannot infer type-variable(s) R [ERROR] (actual and formal argument lists differ in length) [ERROR] - [Help 1] Because in the DataFrame the *map *method is defined as : [image: Images intégrées 1] And once this is translated to bytecode the actual Java signature uses a Function1 and adds a ClassTag parameter. I can try to go around this and use the scala.reflect.ClassTag$ like that : ClassTag$.MODULE$.apply(String.class) To get the second ClassTag parameter right, but then instantiating a java.util.Function or using the Java 8 lambdas fail to work, and if I try to instantiate a proper scala Function1... well this is a world of pain. This is a regression introduced by the 1.3.x DataFrame because JavaSchemaRDD used to be JavaRDDLike but DataFrame's
Re: [Spark SQL] Java map/flatMap api broken with DataFrame in 1.3.{0,1}
and the PR: https://github.com/apache/spark/pull/5564 Thank you ! Olivier. Le ven. 17 avr. 2015 à 23:00, Reynold Xin r...@databricks.com a écrit : No there isn't a convention. Although if you want to show java 8, you should also show java 6/7 syntax since there are still more 7 users than 8. On Fri, Apr 17, 2015 at 3:36 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Is there any convention *not* to show java 8 versions in the documentation ? Le ven. 17 avr. 2015 à 21:39, Reynold Xin r...@databricks.com a écrit : Please do! Thanks. On Fri, Apr 17, 2015 at 2:36 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Ok, do you want me to open a pull request to fix the dedicated documentation ? Le ven. 17 avr. 2015 à 18:14, Reynold Xin r...@databricks.com a écrit : I think in 1.3 and above, you'd need to do .sql(...).javaRDD().map(..) On Fri, Apr 17, 2015 at 9:22 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yes thanks ! Le ven. 17 avr. 2015 à 16:20, Ted Yu yuzhih...@gmail.com a écrit : The image didn't go through. I think you were referring to: override def map[R: ClassTag](f: Row = R): RDD[R] = rdd.map(f) Cheers On Fri, Apr 17, 2015 at 6:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, I had an issue trying to use Spark SQL from Java (8 or 7), I tried to reproduce it in a small test case close to the actual documentation https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection , so sorry for the long mail, but this is Java : import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; class Movie implements Serializable { private int id; private String name; public Movie(int id, String name) { this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } public class SparkSQLTest { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(My Application); conf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); ArrayListMovie movieArrayList = new ArrayListMovie(); movieArrayList.add(new Movie(1, Indiana Jones)); JavaRDDMovie movies = sc.parallelize(movieArrayList); SQLContext sqlContext = new SQLContext(sc); DataFrame frame = sqlContext.applySchema(movies, Movie.class); frame.registerTempTable(movies); sqlContext.sql(select name from movies) *.map(row - row.getString(0)) // this is what i would expect to work *.collect(); } } But this does not compile, here's the compilation error : [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/MainSQL.java:[37,47] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] *required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR * [ERROR]* found: (row)-Na[...]ng(0) * [ERROR] *reason: cannot infer type-variable(s) R * [ERROR] *(actual and formal argument lists differ in length) * [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/SampleSHit.java:[56,17] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR [ERROR] found: (row)-row[...]ng(0) [ERROR] reason: cannot infer type-variable(s) R [ERROR] (actual and formal argument lists differ in length) [ERROR] - [Help 1] Because in the DataFrame the *map *method is defined as : [image: Images intégrées 1] And once this is translated to bytecode the actual Java signature uses a Function1 and adds a ClassTag parameter. I can try to go around this and use the scala.reflect.ClassTag$ like that : ClassTag$.MODULE$.apply(String.class) To get the second ClassTag parameter right, but then instantiating a java.util.Function or using the Java 8 lambdas fail to work, and if I try to instantiate a proper scala Function1... well this is a world of pain. This is a regression introduced by the 1.3.x DataFrame because JavaSchemaRDD used to be JavaRDDLike but DataFrame's
Re: [Spark SQL] Java map/flatMap api broken with DataFrame in 1.3.{0,1}
Is there any convention *not* to show java 8 versions in the documentation ? Le ven. 17 avr. 2015 à 21:39, Reynold Xin r...@databricks.com a écrit : Please do! Thanks. On Fri, Apr 17, 2015 at 2:36 PM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Ok, do you want me to open a pull request to fix the dedicated documentation ? Le ven. 17 avr. 2015 à 18:14, Reynold Xin r...@databricks.com a écrit : I think in 1.3 and above, you'd need to do .sql(...).javaRDD().map(..) On Fri, Apr 17, 2015 at 9:22 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Yes thanks ! Le ven. 17 avr. 2015 à 16:20, Ted Yu yuzhih...@gmail.com a écrit : The image didn't go through. I think you were referring to: override def map[R: ClassTag](f: Row = R): RDD[R] = rdd.map(f) Cheers On Fri, Apr 17, 2015 at 6:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, I had an issue trying to use Spark SQL from Java (8 or 7), I tried to reproduce it in a small test case close to the actual documentation https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection , so sorry for the long mail, but this is Java : import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; class Movie implements Serializable { private int id; private String name; public Movie(int id, String name) { this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } public class SparkSQLTest { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName(My Application); conf.setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); ArrayListMovie movieArrayList = new ArrayListMovie(); movieArrayList.add(new Movie(1, Indiana Jones)); JavaRDDMovie movies = sc.parallelize(movieArrayList); SQLContext sqlContext = new SQLContext(sc); DataFrame frame = sqlContext.applySchema(movies, Movie.class); frame.registerTempTable(movies); sqlContext.sql(select name from movies) *.map(row - row.getString(0)) // this is what i would expect to work *.collect(); } } But this does not compile, here's the compilation error : [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/MainSQL.java:[37,47] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] *required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR * [ERROR]* found: (row)-Na[...]ng(0) * [ERROR] *reason: cannot infer type-variable(s) R * [ERROR] *(actual and formal argument lists differ in length) * [ERROR] /Users/ogirardot/Documents/spark/java-project/src/main/java/org/apache/spark/SampleSHit.java:[56,17] method map in class org.apache.spark.sql.DataFrame cannot be applied to given types; [ERROR] required: scala.Function1org.apache.spark.sql.Row,R,scala.reflect.ClassTagR [ERROR] found: (row)-row[...]ng(0) [ERROR] reason: cannot infer type-variable(s) R [ERROR] (actual and formal argument lists differ in length) [ERROR] - [Help 1] Because in the DataFrame the *map *method is defined as : [image: Images intégrées 1] And once this is translated to bytecode the actual Java signature uses a Function1 and adds a ClassTag parameter. I can try to go around this and use the scala.reflect.ClassTag$ like that : ClassTag$.MODULE$.apply(String.class) To get the second ClassTag parameter right, but then instantiating a java.util.Function or using the Java 8 lambdas fail to work, and if I try to instantiate a proper scala Function1... well this is a world of pain. This is a regression introduced by the 1.3.x DataFrame because JavaSchemaRDD used to be JavaRDDLike but DataFrame's are not (and are not callable with JFunctions), I can open a Jira if you want ? Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Build spark failed with maven
Hi, this was not reproduced for me, what kind of jdk are you using for the zinc server ? Regards, Olivier. 2015-02-11 5:08 GMT+01:00 Yi Tian tianyi.asiai...@gmail.com: Hi, all I got an ERROR when I build spark master branch with maven (commit: 2d1e916730492f5d61b97da6c483d3223ca44315) [INFO] [INFO] [INFO] Building Spark Project Catalyst 1.3.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-versions) @ spark-catalyst_2.10 --- [INFO] [INFO] --- build-helper-maven-plugin:1.8:add-source (add-scala-sources) @ spark-catalyst_2.10 --- [INFO] Source directory: /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala added. [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark-catalyst_2.10 --- [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ spark-catalyst_2.10 --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/resources [INFO] Copying 3 resources [INFO] [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-catalyst_2.10 --- [INFO] Using zinc server for incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [info] Compiling 69 Scala sources and 3 Java sources to /Users/tianyi/github/community/apache-spark/sql/catalyst/target/scala-2.10/classes...[error] /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala:314: polymorphic expression cannot be instantiated to expected type; [error] found : [T(in method apply)]org.apache.spark.sql.catalyst.dsl.ScalaUdfBuilder[T(in method apply)] [error] required: org.apache.spark.sql.catalyst.dsl.package.ScalaUdfBuilder[T(in method functionToUdfBuilder)] [error] implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, T]): ScalaUdfBuilder[T] = ScalaUdfBuilder(func) Any suggestion?