Why does this not work? Is insert into broken in 1.3.1? It does not throw any errors, fail, or throw exceptions. It simply does not work.
val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(s"s3://textFileDirectory/") val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/") dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet") val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet") parquetFile.registerTempTable("rideaccepted") currentStream.foreachRDD { rdd => val df = sqlContext.jsonRDD(rdd) df.insertInto("rideaccepted") } ssc.start() Or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream("s3://textFileDirectory") val day = sqlContext.jsonFile("s3://textFileDirectory") day.registerTempTable("rideaccepted") currentStream.foreachRDD { rdd => val df = sqlContext.jsonRDD(rdd) df.registerTempTable("tmp_rideaccepted") sqlContext.sql("insert into table rideaccepted select * from tmp_rideaccepted") } ssc.start() or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(s"s3://textFileDirectory/") val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/") dayBefore..registerTempTable("rideaccepted") currentStream.foreachRDD { rdd => val df = sqlContext.jsonRDD(rdd) df.insertInto("rideaccepted") } ssc.start()