How to use an anonymous function with DataFrame.explode() ?

2017-01-07 Thread dpapathanasiou
I need to take a DataFrame of events, and explode them row-wise so that
there's at least one representation per time interval (usually day) in
between events.

Here's a simplified version of the problem, which I have gotten to work in
spark-shell:

case class Meal (food: String, calories: Double, date: Int)
case class AsOfDate (dt: Int)

val m1 = new Meal("steak", 33, 20170101)
val m2 = new Meal("peach", 25, 20170105)

val mDf = sc.parallelize(Seq(m1, m2)).toDF

scala> mDf.show

+-+++
| food|calories|date| 
+-+++
|steak|  33|20170101| 
|peach|  25|20170105| 
+-+++

Now, I want to explode the DataFrame so that there are no gaps in days:

mDf.where($"date" < 20170105).explode(mDf("date")) {
  case Row(date: Int) => (date to 20170104).map(AsOfDate(_))
}

scala> res0.show
 
+-+++
| food|calories|date| 
+-+++
|steak|  33|20170101| 
|steak|  33|20170102| 
|steak|  33|20170103| 
|steak|  33|20170104| 
+-+++

mDf.where($"date" >= 20170105).explode(mDf("date")) {
  case Row(date: Int) => (date to 20170105).map(AsOfDate(_))
}

scala> res1.show
 
+-+++
| food|calories|date| 
+-+++
|peach|  25|20170105| 
+-+++

val exploded = res0.union(res1)

scala> exploded.show

+-+++
| food|calories|date| 
+-+++
|steak|  33|20170101| 
|steak|  33|20170102| 
|steak|  33|20170103| 
|steak|  33|20170104| 
|peach|  25|20170105| 
+-+++

So that gives me what I want, but I'd like to be able to define the function
that does the date iteration elsewhere, and pass it in to the call to
explode().

Part of the the reason is that in the real DataFrame "date" is a
java.sql.Timestamp, which involves more manipulation to add the right number
fo time intervals.

I've tried defining the statements like this, but I cannot get it to work:

val fn1 = (x: Row) => { case Row(date: Int) => (date to
20170104).map(AsOfDate(_)) }

but I get this:

error: missing parameter type for expanded function
The argument types of an anonymous function must be fully known. (SLS 8.5)
Expected type was: ?

So I tried it this way instead:

val fn1 = (x: Row) => x match {
  case Row(date: Int) => (date to 20170104).map(AsOfDate(_)) 
}

which seemed to work, until I attempted to use it with explode():

mDf.where($"date" < 20170105).explode(mDf("date"))fn1

error: missing argument list for method explode in class Dataset

I don't see how I can pass fn1 to explode given that it expects a type Row
as its only input.

Is what I want to do possible, or do I need to write the Row manipulation
functions with explode() as I've done in the working examples above?

As a side question, I also wonder why I need the AsOfDate case class to do
the explosion, since all I want to do is set the column value, where I
already know the type.

Before using AsOfDate, I tried one of the working examples like this:

mDf.where($"date" < 20170105).explode(mDf("date")) {
  case Row(date: Int) => (date to 20170104).map(_.asInstanceOf[Int])
}

but that gave me this:

error: inferred type arguments [Int] do not conform to method explode's type
parameter bounds [A <: Product]

I'm not sure what that means, or how using the case class resolves it.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-an-anonymous-function-with-DataFrame-explode-tp28285.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Read from Google store and save in AWS s3

2017-01-07 Thread neil90
Here is how you would read from Google Cloud Storage(note you need to create
a service account key) ->

os.environ['PYSPARK_SUBMIT_ARGS'] = """--jars
/home/neil/Downloads/gcs-connector-latest-hadoop2.jar pyspark-shell"""

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

conf = SparkConf()\
.setMaster("local[8]")\
.setAppName("GS")   

sc = SparkContext(conf=conf)

sc._jsc.hadoopConfiguration().set("fs.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("fs.gs.project.id", "PUT UR GOOGLE PROJECT
ID HERE")

sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.email",
"testa...@sparkgcs.iam.gserviceaccount.com")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable",
"true")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.keyfile",
"sparkgcs-96bd21691c29.p12")

spark = SparkSession.builder\
.config(conf=sc.getConf())\
.getOrCreate()

dfTermRaw = spark.read.format("csv")\
.option("header", "true")\
.option("delimiter" ,"\t")\
.option("inferSchema", "true")\
.load("gs://bucket_test/sample.tsv")




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Read-from-Google-store-and-save-in-AWS-s3-tp28278p28286.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.0.2, KyroSerializer, double[] is not registered.

2017-01-07 Thread Yan Facai
Hi, all.
I enable kyro in spark with spark-defaults.conf:
 spark.serializer
org.apache.spark.serializer.KryoSerializer
 spark.kryo.registrationRequired  true

A KryoException is raised when a logistic regression algorithm is running:
 Note: To register this class use: kryo.register(double[].class);
 Serialization trace:
 currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

My question is:
Doesn't double[].class be supported by default?

Thanks.


Re: Spark 2.0.2, KyroSerializer, double[] is not registered.

2017-01-07 Thread smartzjp
You can have a try the following code.

ObjectArraySerializer serializer = new ObjectArraySerializer(kryo, 
Double[].class);
kryo.register(Double[].class, serializer);


---

Hi, all.
I enable kyro in spark with spark-defaults.conf:
 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.kryo.registrationRequired  true

A KryoException is raised when a logistic regression algorithm is running:
 Note: To register this class use: kryo.register(double[].class);
 Serialization trace:
 currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
at 
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 
My question is:
Doesn't double[].class be supported by default?

Thanks.