How to use an anonymous function with DataFrame.explode() ?
I need to take a DataFrame of events, and explode them row-wise so that there's at least one representation per time interval (usually day) in between events. Here's a simplified version of the problem, which I have gotten to work in spark-shell: case class Meal (food: String, calories: Double, date: Int) case class AsOfDate (dt: Int) val m1 = new Meal("steak", 33, 20170101) val m2 = new Meal("peach", 25, 20170105) val mDf = sc.parallelize(Seq(m1, m2)).toDF scala> mDf.show +-+++ | food|calories|date| +-+++ |steak| 33|20170101| |peach| 25|20170105| +-+++ Now, I want to explode the DataFrame so that there are no gaps in days: mDf.where($"date" < 20170105).explode(mDf("date")) { case Row(date: Int) => (date to 20170104).map(AsOfDate(_)) } scala> res0.show +-+++ | food|calories|date| +-+++ |steak| 33|20170101| |steak| 33|20170102| |steak| 33|20170103| |steak| 33|20170104| +-+++ mDf.where($"date" >= 20170105).explode(mDf("date")) { case Row(date: Int) => (date to 20170105).map(AsOfDate(_)) } scala> res1.show +-+++ | food|calories|date| +-+++ |peach| 25|20170105| +-+++ val exploded = res0.union(res1) scala> exploded.show +-+++ | food|calories|date| +-+++ |steak| 33|20170101| |steak| 33|20170102| |steak| 33|20170103| |steak| 33|20170104| |peach| 25|20170105| +-+++ So that gives me what I want, but I'd like to be able to define the function that does the date iteration elsewhere, and pass it in to the call to explode(). Part of the the reason is that in the real DataFrame "date" is a java.sql.Timestamp, which involves more manipulation to add the right number fo time intervals. I've tried defining the statements like this, but I cannot get it to work: val fn1 = (x: Row) => { case Row(date: Int) => (date to 20170104).map(AsOfDate(_)) } but I get this: error: missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: ? So I tried it this way instead: val fn1 = (x: Row) => x match { case Row(date: Int) => (date to 20170104).map(AsOfDate(_)) } which seemed to work, until I attempted to use it with explode(): mDf.where($"date" < 20170105).explode(mDf("date"))fn1 error: missing argument list for method explode in class Dataset I don't see how I can pass fn1 to explode given that it expects a type Row as its only input. Is what I want to do possible, or do I need to write the Row manipulation functions with explode() as I've done in the working examples above? As a side question, I also wonder why I need the AsOfDate case class to do the explosion, since all I want to do is set the column value, where I already know the type. Before using AsOfDate, I tried one of the working examples like this: mDf.where($"date" < 20170105).explode(mDf("date")) { case Row(date: Int) => (date to 20170104).map(_.asInstanceOf[Int]) } but that gave me this: error: inferred type arguments [Int] do not conform to method explode's type parameter bounds [A <: Product] I'm not sure what that means, or how using the case class resolves it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-an-anonymous-function-with-DataFrame-explode-tp28285.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Read from Google store and save in AWS s3
Here is how you would read from Google Cloud Storage(note you need to create a service account key) -> os.environ['PYSPARK_SUBMIT_ARGS'] = """--jars /home/neil/Downloads/gcs-connector-latest-hadoop2.jar pyspark-shell""" from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession, SQLContext conf = SparkConf()\ .setMaster("local[8]")\ .setAppName("GS") sc = SparkContext(conf=conf) sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") sc._jsc.hadoopConfiguration().set("fs.gs.project.id", "PUT UR GOOGLE PROJECT ID HERE") sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.email", "testa...@sparkgcs.iam.gserviceaccount.com") sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true") sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.keyfile", "sparkgcs-96bd21691c29.p12") spark = SparkSession.builder\ .config(conf=sc.getConf())\ .getOrCreate() dfTermRaw = spark.read.format("csv")\ .option("header", "true")\ .option("delimiter" ,"\t")\ .option("inferSchema", "true")\ .load("gs://bucket_test/sample.tsv") -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Read-from-Google-store-and-save-in-AWS-s3-tp28278p28286.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark 2.0.2, KyroSerializer, double[] is not registered.
Hi, all. I enable kyro in spark with spark-defaults.conf: spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrationRequired true A KryoException is raised when a logistic regression algorithm is running: Note: To register this class use: kryo.register(double[].class); Serialization trace: currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) My question is: Doesn't double[].class be supported by default? Thanks.
Re: Spark 2.0.2, KyroSerializer, double[] is not registered.
You can have a try the following code. ObjectArraySerializer serializer = new ObjectArraySerializer(kryo, Double[].class); kryo.register(Double[].class, serializer); --- Hi, all. I enable kyro in spark with spark-defaults.conf: spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrationRequired true A KryoException is raised when a logistic regression algorithm is running: Note: To register this class use: kryo.register(double[].class); Serialization trace: currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) My question is: Doesn't double[].class be supported by default? Thanks.