Re: Spark | Window Function |
Hi Julien, Could you give more details about the problems you faced? Here is a working example with Spark dataframe and Spark SQL: https://gist.github.com/radcheb/d16042d8bb3815d3dd42030ecedc43cf Cheers, Radhwane Chebaane 2017-07-18 18:21 GMT+02:00 Julien CHAMP : > Hi Radhwane ! > > I've tested both your solutions using dataframe or spark sql... and in > both cases spark is stucked :/ > Did you test the code that you gave me ? I don't know if I've done > something wrong... > > Regards, > Julien > > Le lun. 10 juil. 2017 à 10:53, Radhwane Chebaane > a écrit : > >> Hi Julien, >> >> >> - Usually, windows functions require less shuffle than cross join so it's >> a little faster depending on use case. For large windows, cross join and >> window functions performances are close. >> - Use can use UDFs and UDAFs as in any Spark SQL request (Geometric Mean >> tested successfully). >> >> Regards, >> Radhwane >> >> 2017-07-06 16:22 GMT+02:00 Julien CHAMP : >> >>> Thx a lot for your answer Radhwane :) >>> >>> >>> I have some (many) use case with such needs of Long in window function. >>> As said in the bug report, I can store events in ms in a dataframe, and >>> want to count the number of events in past 10 years ( requiring a Long >>> value ) >>> >>> -> *Let's imagine that this window is used on timestamp values in ms : >>> I can ask for a window with a range between [-216000L, 0] and only have >>> a few values inside, not necessarily 216000L. I can understand the >>> limitaion for the rowBetween() method but the rangeBetween() method is nice >>> for this kind of usage.* >>> >>> >>> The solution with self join seems nice, but 2 questions : >>> >>> - regarding performances, will it be as fast as window function ? >>> >>> - can I use my own aggregate function ( for example a Geometric Mean ) >>> with your solution ? ( using this : https://docs.databricks.com/ >>> spark/latest/spark-sql/udaf-scala.html ? >>> >>> >>> >>> Thanks again, >>> >>> Regards, >>> >>> >>> Julien >>> >>> >>> >>> Le mer. 5 juil. 2017 à 19:18, Radhwane Chebaane < >>> r.cheba...@mindlytix.com> a écrit : >>> >>>> Hi Julien, >>>> >>>> >>>> Although this is a strange bug in Spark, it's rare to need more than >>>> Integer max value size for a window. >>>> >>>> Nevertheless, most of the window functions can be expressed with >>>> self-joins. Hence, your problem may be solved with this example: >>>> >>>> If input data as follow: >>>> >>>> +---+-+-+ >>>> | id|timestamp|value| >>>> +---+-+-+ >>>> | B|1| 100| >>>> | B|10010| 50| >>>> | B|10020| 200| >>>> | B|25000| 500| >>>> +---+-+-+ >>>> >>>> And the window is (-20L, 0) >>>> >>>> Then this code will give the wanted result: >>>> >>>> df.as("df1").join(df.as("df2"), >>>> $"df2.timestamp" between($"df1.timestamp" - 20L, >>>> $"df1.timestamp")) >>>> .groupBy($"df1.id", $"df1.timestamp", $"df1.value") >>>> .agg( functions.min($"df2.value").as("min___value")) >>>> .orderBy($"df1.timestamp") >>>> .show() >>>> >>>> +---+-+-+---+ >>>> | id|timestamp|value|min___value| >>>> +---+-+-+---+ >>>> | B|1| 100|100| >>>> | B|10010| 50| 50| >>>> | B|10020| 200| 50| >>>> | B|25000| 500|500| >>>> +---+-+-+---+ >>>> >>>> Or by SparkSQL: >>>> >>>> SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as >>>> min___value FROM >>>> ( >>>> SELECT a.id as id, a.timestamp as timestamp, a.value as value, >>>> b.timestamp as _timestamp, b.value as _value >>>> FROM df a CROSS JOIN df b >>>> ON b.timestamp >= a.timestamp - 20L an
Re: Spark | Window Function |
Hi Julien, Although this is a strange bug in Spark, it's rare to need more than Integer max value size for a window. Nevertheless, most of the window functions can be expressed with self-joins. Hence, your problem may be solved with this example: If input data as follow: +---+-+-+ | id|timestamp|value| +---+-+-+ | B|1| 100| | B|10010| 50| | B|10020| 200| | B|25000| 500| +---+-+-+ And the window is (-20L, 0) Then this code will give the wanted result: df.as("df1").join(df.as("df2"), $"df2.timestamp" between($"df1.timestamp" - 20L, $"df1.timestamp")) .groupBy($"df1.id", $"df1.timestamp", $"df1.value") .agg( functions.min($"df2.value").as("min___value")) .orderBy($"df1.timestamp") .show() +---+-+-+---+ | id|timestamp|value|min___value| +---+-+-+---+ | B|1| 100|100| | B|10010| 50| 50| | B|10020| 200| 50| | B|25000| 500|500| +---+-+-+---+ Or by SparkSQL: SELECT c.id as id, c.timestamp as timestamp, c.value, min(c._value) as min___value FROM ( SELECT a.id as id, a.timestamp as timestamp, a.value as value, b.timestamp as _timestamp, b.value as _value FROM df a CROSS JOIN df b ON b.timestamp >= a.timestamp - 20L and b.timestamp <= a.timestamp ) c GROUP BY c.id, c.timestamp, c.value ORDER BY c.timestamp This must be also possible also on Spark Streaming however don't expect high performance. Cheers, Radhwane 2017-07-05 10:41 GMT+02:00 Julien CHAMP : > Hi there ! > > Let me explain my problem to see if you have a good solution to help me :) > > Let's imagine that I have all my data in a DB or a file, that I load in a > dataframe DF with the following columns : > *id | timestamp(ms) | value* > A | 100 | 100 > A | 110 | 50 > B | 100 | 100 > B | 110 | 50 > B | 120 | 200 > B | 250 | 500 > C | 100 | 200 > C | 110 | 500 > > The timestamp is a *long value*, so as to be able to express date in ms > from -01-01 to today ! > > I want to compute operations such as min, max, average on the *value > column*, for a given window function, and grouped by id ( Bonus : if > possible for only some timestamps... ) > > For example if I have 3 tuples : > > id | timestamp(ms) | value > B | 100 | 100 > B | 110 | 50 > B | 120 | 200 > B | 250 | 500 > > I would like to be able to compute the min value for windows of time = 20. > This would result in such a DF : > > id | timestamp(ms) | value | min___value > B | 100 | 100 | 100 > B | 110 | 50 | 50 > B | 120 | 200 | 50 > B | 250 | 500 | 500 > > This seems the perfect use case for window function in spark ( cf : > https://databricks.com/blog/2015/07/15/introducing-window- > functions-in-spark-sql.html ) > I can use : > > Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0) > df.withColumn("min___value", min(df.col("value")).over(tw)) > > This leads to the perfect answer ! > > However, there is a big bug with window functions as reported here ( > https://issues.apache.org/jira/browse/SPARK-19451 ) when working with > Long values !!! So I can't use this > > So my question is ( of course ) how can I resolve my problem ? > If I use spark streaming I will face the same issue ? > > I'll be glad to discuss this problem with you, feel free to answer :) > > Regards, > > Julien > -- > > > Julien CHAMP — Data Scientist > > > *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : > **jch...@tellmeplus.com > * > > *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* : *here* > <https://www.linkedin.com/in/julienchamp> > > TellMePlus S.A — Predictive Objects > > *Paris* : 7 rue des Pommerots, 78400 Chatou > *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière > > > Ce message peut contenir des informations confidentielles ou couvertes par > le secret professionnel, à l’intention de son destinataire. Si vous n’en > êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer > toute copie. > This email may contain confidential and/or privileged information for the > intended recipient. If you are not the intended recipient, please contact > the sender and delete all copies. > > > <http://www.tellmeplus.com/assets/emailing/banner.html> -- [image: photo] Radhwane Chebaane Distributed systems engineer, Mindlytix Mail: radhw...@mindlytix.com Mobile: +33 695 588 906 <+33+695+588+906> <https://mail.google.com/mail/u/0/#> Skype: rad.cheb <https://mail.google.com/mail/u/0/#> LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b> <https://mail.google.com/mail/u/0/#>
Re: Load multiple CSV from different paths
Hi, Referring to spark 2.x documentation, in org.apache.spark.sql.DataFrameReader you have this function: def csv(paths: String*): DataFrame <http://spark.apache.org/docs/2.1.0/api/scala/org/apache/spark/sql/package.html#DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]> So you can unpack your Array of paths like this: val sources = paths.split(',').toSeq spark.read.option("header", "false") .schema(custom_schema) .option('delimiter', '\t') .option('mode', 'DROPMALFORMED') .csv(sources: _*) In spark 1.6.x I think this may work with spark-csv <https://github.com/databricks/spark-csv> : spark.read.format("com.databricks.spark.csv").option("header", "false") .schema(custom_schema) .option('delimiter', '\t') .option('mode', 'DROPMALFORMED') .load(sources: _*) Cheers, Radhwane Chebaane 2017-07-05 16:08 GMT+02:00 Didac Gil : > Hi, > > Do you know any simple way to load multiple csv files (same schema) that > are in different paths? > Wildcards are not a solution, as I want to load specific csv files from > different folders. > > I came across a solution (https://stackoverflow.com/ > questions/37639956/how-to-import-multiple-csv-files-in-a-single-load) that > suggests something like > > spark.read.format("csv").option("header", "false") > .schema(custom_schema) > .option('delimiter', '\t') > .option('mode', 'DROPMALFORMED') > .load(paths.split(',')) > > However, even it mentions that this approach would work in Spark 2.x, I > don’t find an implementation of load that accepts an Array[String] as an > input parameter. > > Thanks in advance for your help. > > > Didac Gil de la Iglesia > PhD in Computer Science > didacg...@gmail.com > Spain: +34 696 285 544 <+34%20696%2028%2055%2044> > Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037> > Skype: didac.gil.de.la.iglesia > > -- [image: photo] Radhwane Chebaane Distributed systems engineer, Mindlytix Mail: radhw...@mindlytix.com Mobile: +33 695 588 906 <+33+695+588+906> <https://mail.google.com/mail/u/0/#> Skype: rad.cheb <https://mail.google.com/mail/u/0/#> LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b> <https://mail.google.com/mail/u/0/#>
Re: Cannot convert from JavaRDD to Dataframe
Hi, DataTypes is a Scala Array which corresponds in Java to Java Array. So you must use a String[]. However since RowFactory.create expects an array of Object as Columns content, it should be: public Row call(String line){ return RowFactory.create(new String[][]{line.split(" ")}); } More details in this Stackoverflow question <http://stackoverflow.com/questions/43411492/createdataframe-throws-exception-when-pass-javardd-that-contains-arraytype-col/43585039#43585039> . Hope this works for you, Cheers 2017-04-23 18:13 GMT+02:00 Chen, Mingrui : > Hello everyone! > > > I am a new Spark learner and trying to do a task seems very simple. I want > to read a text file, save the content to JavaRDD and convert it to > Dataframe, so I can use it for Word2Vec Model in the future. The code looks > pretty simple but I cannot make it work: > > > SparkSession spark = SparkSession.builder().appName("Word2Vec"). > getOrCreate(); > JavaRDD lines = spark.sparkContext().textFile("input.txt", > 10).toJavaRDD(); > JavaRDD rows = lines.map(new Function(){ > public Row call(String line){ > return RowFactory.create(Arrays.asList(line.split(" "))); > } > }); > StructType schema = new StructType(new StructField[] { > new StructField("text", new ArrayType(DataTypes.StringType, true), false, > Metadata.empty()) > }); > Dataset input = spark.createDataFrame(rows, schema); > input.show(3); > > It throws an exception at input.show(3): > > > Caused by: java.lang.ClassCastException: cannot assign instance of > scala.collection.immutable.List$SerializationProxy to field > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type > scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD > > Seems it has problem converting the JavaRDD to Dataframe. However I > cannot figure out what mistake I make here and the exception message is > hard to understand. Anyone can help? Thanks! > > -- [image: photo] Radhwane Chebaane Distributed systems engineer, Mindlytix Mail: radhw...@mindlytix.com Mobile: +33 695 588 906 <+33+695+588+906> <https://mail.google.com/mail/u/0/#> Skype: rad.cheb <https://mail.google.com/mail/u/0/#> LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b> <https://mail.google.com/mail/u/0/#>
[sparkR] [MLlib] : Is word2vec implemented in SparkR MLlib ?
Hi, I've been experimenting with the Spark *Word2vec* implementation in the MLLib package with Scala and it was very nice. I need to use the same algorithm in R leveraging the power of spark distribution with SparkR. I have been looking on the mailing list and Stackoverflow for any *Word2vec* use-case in SparkR but no luck. Is there any implementation of *Word2vec* in *SparkR* ? Is there any current work to support this feature in MLlib with R? Thanks! Radhwane Chebaane -- [image: photo] Radhwane Chebaane Distributed systems engineer, Mindlytix Mail: radhw...@mindlytix.com Mobile: +33 695 588 906 <+33+695+588+906> <https://mail.google.com/mail/u/0/#> Skype: rad.cheb <https://mail.google.com/mail/u/0/#> LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b> <https://mail.google.com/mail/u/0/#>