ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala

2015-08-21 Thread Emma Boya Peng
Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file.
Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
  val accountType = Long.valueOf(line(1))
  val worldId = Long.valueOf(line(2))
  val dtEventTime = line(3)
  val iEventId = line(4)
  val vVersionId = line(5)
  val vUin = line(6)
  val vClientIp = line(7)
  val vZoneId = line(8)
  val dtCreateTime = line(9)
  val iFeeFlag = Long.valueOf(line(10))
  val vLoginWay = line(11)

  return Row(gameId, accountType, worldId, dtEventTime, iEventId,
vVersionId, vUin, vClientIp,
 vZoneId, dtCreateTime, vZoneId, dtCreateTime, iFeeFlag,
vLoginWay)


ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala

2015-08-21 Thread Emma Boya Peng
Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file, but I got
java.lang.ClassCastException:
java.lang.String cannot be cast to java.lang.Long on the last step.

Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
  val accountType = Long.valueOf(line(1))
  val worldId = Long.valueOf(line(2))
  val dtEventTime = line(3)
  val iEventId = line(4)

  return Row(gameId, accountType, worldId, dtEventTime, iEventId)

2. Generate the schema:
 return StructType(Array(StructField(GameId, LongType, true),
StructField(AccountType, LongType, true), StructField(WorldId,
LongType, true), StructField(dtEventTime, StringType, true),
StructField(iEventId,
StringType, true)))

3. Apply the schema and apply it to the RDD of Rows:
val schemaRdd = sqlContext.createDataFrame(rowRdd, schema)

4. Save schemaRdd as a parquet file:
 schemaRdd.saveAsParquetFile(dst + / + tableName + .parquet)

However, it gave me a ClassCastException on step 4 (the DataFrame, i.e.
schemaRdd, can be correctly printed out according to the specified schema).

Thank you for your help!

Best,
Emma

Stack trace of the exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage
1.0 (TID 12, 10-4-28-24): java.lang.ClassCastException: java.lang.String
cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at
org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:357)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:338)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:324)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at org.apache.spark.sql.parquet.ParquetRelation2.org
$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)