Hi Brandon,

Can you explain what did you mean by "It simply does not work"? You did not
see new data files?

Thanks,

Yin

On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bwwintheho...@gmail.com>
wrote:

> Why does this not work? Is insert into broken in 1.3.1? It does not throw
> any errors, fail, or throw exceptions. It simply does not work.
>
>
> val ssc = new StreamingContext(sc, Minutes(10))
>
> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>
> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
> parquetFile.registerTempTable("rideaccepted")
>
> currentStream.foreachRDD { rdd =>
>   val df = sqlContext.jsonRDD(rdd)
>   df.insertInto("rideaccepted")
> }
>
> ssc.start()
>
>
> Or this?
>
> val ssc = new StreamingContext(sc, Minutes(10))
> val currentStream = ssc.textFileStream("s3://textFileDirectory")
> val day = sqlContext.jsonFile("s3://textFileDirectory")
> day.registerTempTable("rideaccepted")
>
>
> currentStream.foreachRDD { rdd =>
>   val df = sqlContext.jsonRDD(rdd)
>   df.registerTempTable("tmp_rideaccepted")
>   sqlContext.sql("insert into table rideaccepted select * from 
> tmp_rideaccepted")
> }
>
> ssc.start()
>
>
> or this?
>
> val ssc = new StreamingContext(sc, Minutes(10))
>
> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>
> dayBefore..registerTempTable("rideaccepted")
>
> currentStream.foreachRDD { rdd =>
>   val df = sqlContext.jsonRDD(rdd)
>   df.insertInto("rideaccepted")
> }
>
> ssc.start()
>
>

Reply via email to