Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file, but I got
"java.lang.ClassCastException:
java.lang.String cannot be cast to java.lang.Long" on the last step.

Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
      val accountType = Long.valueOf(line(1))
      val worldId = Long.valueOf(line(2))
      val dtEventTime = line(3)
      val iEventId = line(4)

      return Row(gameId, accountType, worldId, dtEventTime, iEventId)

2. Generate the schema:
 return StructType(Array(StructField("GameId", LongType, true),
StructField("AccountType", LongType, true), StructField("WorldId",
LongType, true), StructField("dtEventTime", StringType, true),
StructField("iEventId",
StringType, true)))

3. Apply the schema and apply it to the RDD of Rows:
val schemaRdd = sqlContext.createDataFrame(rowRdd, schema)

4. Save schemaRdd as a parquet file:
 schemaRdd.saveAsParquetFile(dst + "/" + tableName + ".parquet")

However, it gave me a ClassCastException on step 4 (the DataFrame, i.e.
schemaRdd, can be correctly printed out according to the specified schema).

Thank you for your help!

Best,
Emma

Stack trace of the exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage
1.0 (TID 12, 10-4-28-24): java.lang.ClassCastException: java.lang.String
cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at
org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:357)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:338)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:324)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at org.apache.spark.sql.parquet.ParquetRelation2.org
$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Reply via email to