I decided to play around with DataFrames this morning but I'm running into
quite a few issues. I'm assuming that I must be doing something wrong so would
appreciate some advice.
First, I create my Data Frame.
import sqlContext.implicits._
case class Entity(InternalId: Long, EntityId: Long, EntityType: String,
CustomerId: String, EntityURI: String, NumDocs: Long)
val entities = sc.textFile("s3n://darin/Entities.csv")
val entitiesArr = entities.map(v => v.split('|'))
val dfEntity = entitiesArr.map(arr => Entity(arr(0).toLong, arr(1).toLong,
arr(2), arr(3), arr(4), arr(5).toLong)).toDF()
Second, I verify the schema.
dfEntity.printSchema
root
|-- InternalId: long (nullable = false)
|-- EntityId: long (nullable = false)
|-- EntityType: string (nullable = true)
|-- CustomerId: string (nullable = true)
|-- EntityURI: string (nullable = true)
|-- NumDocs: long (nullable = false)
Third, I verify I can select a column.
dfEntity.select("InternalId").limit(10).show()
InternalId
1
2
3
4
5
6
7
8
9
10
But, things then start to break down. Let's assume I want to filter so I only
have records where the InternalId is < 5.
dfEntity.filter("InternalId" < 5L).count()
But, this gives me the following error message. Doesn't the schema above
indicate the InternalId column should be of type Long?
<console>:42: error: type mismatch;
found : Long(5L)
required: String
dfEntity.filter("InternalId" < 5L).count()
I then try the following
dfEntity.filter(dfEntity("InternalId") < 5L).count()
Now, this gives me the following error instead.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in
stage 153.0 failed 4 times, most recent failure: Lost task 13.3 in stage 153.0
(TID 1636, ip-10-0-200-6.ec2.internal): java.lang.ArrayIndexOutOfBoundsException
I'm using Apache Spark 1.3.
Thanks.
Darin.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]