I think current the ExistingRDD is not supported. But ParquestRelation is 
supported, probably you can try this as walk around.

case logical.InsertIntoTable(table: ParquetRelation, partition, child, 
overwrite) =>
        InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil

example:

case class Rec(name:String,pv:Int)
val sqlContext=new org.apache.spark.sql.SQLContext(sc)

import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))
d1.saveAsParquetFile("p1.parquet")
val d2=sc.parallelize(Array(("a",10),("b",3),("c",5))).map(e=>Rec(e._1,e._2))
d2.saveAsParquetFile("p2.parquet")
val d1=sqlContext.parquetFile("p1.parquet")
val d2=sqlContext.parquetFile("p2.parquet")
d1.registerAsTable("logs")
d2.insertInto("logs")
sqlContext.sql("select count(*) from logs").collect 

Thanks.

Zhan Zhang


On Aug 26, 2014, at 1:11 AM, praveshjain1991 <praveshjain1...@gmail.com> wrote:

> Thanks for the reply.
> 
> Ya it doesn't seem doable straight away. Someone suggested this
> 
> /For each of your streams, first create an emty RDD that you register as a
> table, obtaining an empty table. For your example, let's say you call it
> "allTeenagers".
> 
> Then, for each of your queries, use SchemaRDD's insertInto method to add the
> result to that table:
> 
> teenagers.insertInto("allTeenagers")
> 
> If you do this with both your streams, creating two separate accumulation
> tables, you can then join them using a plain old SQL query.
> /
> 
> So I was trying it but can't seem to use the insertInto method in the
> correct way. Something like:
> 
>    var p1 = Person("Hari",22)
>    val rdd1 = sc.parallelize(Array(p1))    
>    rdd1.registerAsTable("data")
> 
>    var p2 = Person("sagar", 22)
>    var rdd2 = sc.parallelize(Array(p2))
>    rdd2.insertInto("data")
> 
> is giving the error : "java.lang.AssertionError: assertion failed: No plan
> for InsertIntoTable Map(), false"
> 
> Any thoughts?
> 
> Thanks
> 
> Hi again,
> 
> On Tue, Aug 26, 2014 at 10:13 AM, Tobias Pfeiffer &lt;tgp@&gt; wrote:
>> 
>> On Mon, Aug 25, 2014 at 7:11 PM, praveshjain1991 <
>> praveshjain1991@> wrote:
>>> 
>>> "If you want to issue an SQL statement on streaming data, you must have
>>> both
>>> the registerAsTable() and the sql() call *within* the foreachRDD(...)
>>> block,
>>> or -- as you experienced -- the table name will be unknown"
>>> 
>>> Since this is the case then is there any way to run join over data
>>> received
>>> from two different streams?
>>> 
>> 
>> Couldn't you do dstream1.join(dstream2).foreachRDD(...)?
>> 
> 
> Ah, I guess you meant something like "SELECT * FROM dstream1 JOIN dstream2
> WHERE ..."? I don't know if that is possible. Doesn't seem easy to me; I
> don't think that's doable with the current codebase...
> 
> Tobias
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-run-SparkSQL-over-Spark-Streaming-tp12530p12812.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Reply via email to