Hi Yin, Yes there were no new rows. I fixed it by doing a .remember on the context. Obviously, this is not ideal.
On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai <yh...@databricks.com> wrote: > Hi Brandon, > > Can you explain what did you mean by "It simply does not work"? You did > not see new data files? > > Thanks, > > Yin > > On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bwwintheho...@gmail.com> > wrote: > >> Why does this not work? Is insert into broken in 1.3.1? It does not throw >> any errors, fail, or throw exceptions. It simply does not work. >> >> >> val ssc = new StreamingContext(sc, Minutes(10)) >> >> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/") >> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/") >> >> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet") >> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet") >> parquetFile.registerTempTable("rideaccepted") >> >> currentStream.foreachRDD { rdd => >> val df = sqlContext.jsonRDD(rdd) >> df.insertInto("rideaccepted") >> } >> >> ssc.start() >> >> >> Or this? >> >> val ssc = new StreamingContext(sc, Minutes(10)) >> val currentStream = ssc.textFileStream("s3://textFileDirectory") >> val day = sqlContext.jsonFile("s3://textFileDirectory") >> day.registerTempTable("rideaccepted") >> >> >> currentStream.foreachRDD { rdd => >> val df = sqlContext.jsonRDD(rdd) >> df.registerTempTable("tmp_rideaccepted") >> sqlContext.sql("insert into table rideaccepted select * from >> tmp_rideaccepted") >> } >> >> ssc.start() >> >> >> or this? >> >> val ssc = new StreamingContext(sc, Minutes(10)) >> >> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/") >> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/") >> >> dayBefore..registerTempTable("rideaccepted") >> >> currentStream.foreachRDD { rdd => >> val df = sqlContext.jsonRDD(rdd) >> df.insertInto("rideaccepted") >> } >> >> ssc.start() >> >> >