Why does this not work? Is insert into broken in 1.3.1?

val ssc = new StreamingContext(sc, Minutes(10))

val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")

dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
parquetFile.registerTempTable("rideaccepted")

currentStream.foreachRDD { rdd =>
  val df = sqlContext.jsonRDD(rdd)
  df.insertInto("rideaccepted")
}

ssc.start()


Or this?

val ssc = new StreamingContext(sc, Minutes(10))

val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")

dayBefore..registerTempTable("rideaccepted")

currentStream.foreachRDD { rdd =>
  val df = sqlContext.jsonRDD(rdd)
  df.insertInto("rideaccepted")
}

ssc.start()

Reply via email to