Hi Yin,

Yes there were no new rows. I fixed it by doing a .remember on the context.
Obviously, this is not ideal.

On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai <yh...@databricks.com> wrote:

> Hi Brandon,
>
> Can you explain what did you mean by "It simply does not work"? You did
> not see new data files?
>
> Thanks,
>
> Yin
>
> On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bwwintheho...@gmail.com>
> wrote:
>
>> Why does this not work? Is insert into broken in 1.3.1? It does not throw
>> any errors, fail, or throw exceptions. It simply does not work.
>>
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>>
>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>
>> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
>> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
>> parquetFile.registerTempTable("rideaccepted")
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.insertInto("rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>> Or this?
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>> val currentStream = ssc.textFileStream("s3://textFileDirectory")
>> val day = sqlContext.jsonFile("s3://textFileDirectory")
>> day.registerTempTable("rideaccepted")
>>
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.registerTempTable("tmp_rideaccepted")
>>   sqlContext.sql("insert into table rideaccepted select * from 
>> tmp_rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>> or this?
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>>
>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>
>> dayBefore..registerTempTable("rideaccepted")
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.insertInto("rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>

Reply via email to