Hi Brandon, Can you explain what did you mean by "It simply does not work"? You did not see new data files?
Thanks, Yin On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bwwintheho...@gmail.com> wrote: > Why does this not work? Is insert into broken in 1.3.1? It does not throw > any errors, fail, or throw exceptions. It simply does not work. > > > val ssc = new StreamingContext(sc, Minutes(10)) > > val currentStream = ssc.textFileStream(s"s3://textFileDirectory/") > val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/") > > dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet") > val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet") > parquetFile.registerTempTable("rideaccepted") > > currentStream.foreachRDD { rdd => > val df = sqlContext.jsonRDD(rdd) > df.insertInto("rideaccepted") > } > > ssc.start() > > > Or this? > > val ssc = new StreamingContext(sc, Minutes(10)) > val currentStream = ssc.textFileStream("s3://textFileDirectory") > val day = sqlContext.jsonFile("s3://textFileDirectory") > day.registerTempTable("rideaccepted") > > > currentStream.foreachRDD { rdd => > val df = sqlContext.jsonRDD(rdd) > df.registerTempTable("tmp_rideaccepted") > sqlContext.sql("insert into table rideaccepted select * from > tmp_rideaccepted") > } > > ssc.start() > > > or this? > > val ssc = new StreamingContext(sc, Minutes(10)) > > val currentStream = ssc.textFileStream(s"s3://textFileDirectory/") > val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/") > > dayBefore..registerTempTable("rideaccepted") > > currentStream.foreachRDD { rdd => > val df = sqlContext.jsonRDD(rdd) > df.insertInto("rideaccepted") > } > > ssc.start() > >