Re: [Spark Streaming][Problem with DataFrame UDFs]
Quick correction in the code snippet I sent in my previous email: Line: val enrichedDF = inputDF.withColumn("semantic", udf(col("url"))) Should be replaced by: val enrichedDF = inputDF.withColumn("semantic", enrichUDF(col("url"))) On Thu, Jan 21, 2016 at 11:07 AM, Jean-Pierre OCALAN wrote: > Hi Cody, > > First of all thanks a lot for your quick reply, although I have removed > this post couple of hours after posting it because I ended up finding it > was due to the way I was using DataFrame UDFs. > > Essentially I didn't know that UDFs were purely lazy and in case of the > example below the UDF gets executed 3 times on the entire data set. > > // let's imagine we have an input DataFrame inputDF with "url" column > > // semanticClassifier.classify(url) returns Map[Int, Int] > val enrichUDF = udf { (url: String) => semanticClassifier.classify(url) } > > // enrichedDF will have all columns contained in inputDF + a semantic > column which will contain result of execution of the classification on the > url column value. > val enrichedDF = inputDF.withColumn("semantic", udf(col("url"))) > > val outputDF = enrichedDF.select(col("*"), > col("semantic")(0).as("semantic1"), col("semantic")(1).as("semantic2"), > col("semantic")(2).as("semantic3")).drop("semantic") > > // The udf will be executed 3 times on the entire dataset > outputDF.count() > > By adding enrichedDF.persist() and unpersist() later on I was able to > quickly solve the issue but don't really like it, so I will probably work > directly with the RDD[Row] and schema maintenance in order to recreate > DataFrame. > > Thanks again, > JP. > > > > On Thu, Jan 21, 2016 at 10:45 AM, Cody Koeninger > wrote: > >> If you can share an isolated example I'll take a look. Not something >> I've run into before. >> >> On Wed, Jan 20, 2016 at 3:53 PM, jpocalan wrote: >> >>> Hi, >>> >>> I have an application which creates a Kafka Direct Stream from 1 topic >>> having 5 partitions. >>> As a result each batch is composed of an RDD having 5 partitions. >>> In order to apply transformation to my batch I have decided to convert >>> the >>> RDD to DataFrame (DF) so that I can easily add column to the initial DF >>> by >>> using custom UDFs. >>> >>> Although, when I am applying any udf to the DF I am noticing that the udf >>> will get execute multiple times and this factor is driven by the number >>> of >>> partitions. >>> For example, imagine I have a RDD with 10 records and 5 partitions >>> ideally >>> my UDF should get called 10 times, although it gets consistently called >>> 50 >>> times, but the resulting DF is correct and when executing a count() >>> properly >>> return 10, as expected. >>> >>> I have changed my code to work directly with RDDs using mapPartitions and >>> the transformation gets called proper amount of time. >>> >>> As additional information, I have set spark.speculation to false and no >>> tasks failed. >>> >>> I am working on a smaller example that would isolate this potential >>> issue, >>> but in the meantime I would like to know if somebody encountered this >>> issue. >>> >>> Thank you. >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Problem-with-DataFrame-UDFs-tp26024.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> > -- jean-pierre ocalan jpoca...@gmail.com
Re: [Spark Streaming][Problem with DataFrame UDFs]
Hi Cody, First of all thanks a lot for your quick reply, although I have removed this post couple of hours after posting it because I ended up finding it was due to the way I was using DataFrame UDFs. Essentially I didn't know that UDFs were purely lazy and in case of the example below the UDF gets executed 3 times on the entire data set. // let's imagine we have an input DataFrame inputDF with "url" column // semanticClassifier.classify(url) returns Map[Int, Int] val enrichUDF = udf { (url: String) => semanticClassifier.classify(url) } // enrichedDF will have all columns contained in inputDF + a semantic column which will contain result of execution of the classification on the url column value. val enrichedDF = inputDF.withColumn("semantic", udf(col("url"))) val outputDF = enrichedDF.select(col("*"), col("semantic")(0).as("semantic1"), col("semantic")(1).as("semantic2"), col("semantic")(2).as("semantic3")).drop("semantic") // The udf will be executed 3 times on the entire dataset outputDF.count() By adding enrichedDF.persist() and unpersist() later on I was able to quickly solve the issue but don't really like it, so I will probably work directly with the RDD[Row] and schema maintenance in order to recreate DataFrame. Thanks again, JP. On Thu, Jan 21, 2016 at 10:45 AM, Cody Koeninger wrote: > If you can share an isolated example I'll take a look. Not something I've > run into before. > > On Wed, Jan 20, 2016 at 3:53 PM, jpocalan wrote: > >> Hi, >> >> I have an application which creates a Kafka Direct Stream from 1 topic >> having 5 partitions. >> As a result each batch is composed of an RDD having 5 partitions. >> In order to apply transformation to my batch I have decided to convert the >> RDD to DataFrame (DF) so that I can easily add column to the initial DF by >> using custom UDFs. >> >> Although, when I am applying any udf to the DF I am noticing that the udf >> will get execute multiple times and this factor is driven by the number of >> partitions. >> For example, imagine I have a RDD with 10 records and 5 partitions ideally >> my UDF should get called 10 times, although it gets consistently called 50 >> times, but the resulting DF is correct and when executing a count() >> properly >> return 10, as expected. >> >> I have changed my code to work directly with RDDs using mapPartitions and >> the transformation gets called proper amount of time. >> >> As additional information, I have set spark.speculation to false and no >> tasks failed. >> >> I am working on a smaller example that would isolate this potential issue, >> but in the meantime I would like to know if somebody encountered this >> issue. >> >> Thank you. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Problem-with-DataFrame-UDFs-tp26024.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: [Spark Streaming][Problem with DataFrame UDFs]
If you can share an isolated example I'll take a look. Not something I've run into before. On Wed, Jan 20, 2016 at 3:53 PM, jpocalan wrote: > Hi, > > I have an application which creates a Kafka Direct Stream from 1 topic > having 5 partitions. > As a result each batch is composed of an RDD having 5 partitions. > In order to apply transformation to my batch I have decided to convert the > RDD to DataFrame (DF) so that I can easily add column to the initial DF by > using custom UDFs. > > Although, when I am applying any udf to the DF I am noticing that the udf > will get execute multiple times and this factor is driven by the number of > partitions. > For example, imagine I have a RDD with 10 records and 5 partitions ideally > my UDF should get called 10 times, although it gets consistently called 50 > times, but the resulting DF is correct and when executing a count() > properly > return 10, as expected. > > I have changed my code to work directly with RDDs using mapPartitions and > the transformation gets called proper amount of time. > > As additional information, I have set spark.speculation to false and no > tasks failed. > > I am working on a smaller example that would isolate this potential issue, > but in the meantime I would like to know if somebody encountered this > issue. > > Thank you. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Problem-with-DataFrame-UDFs-tp26024.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >