Hi,
On Thu, Mar 12, 2015 at 12:08 AM, Huang, Jie <[email protected]> wrote:
>
> According to my understanding, your approach is to register a series of
> tables by using transformWith, right? And then, you can get a new Dstream
> (i.e., SchemaDstream), which consists of lots of SchemaRDDs.
>
Yep, it's basically the following:
case class SchemaDStream(sqlc: SQLContext,
dataStream: DStream[Row],
schemaStream: DStream[StructType]) {
def registerStreamAsTable(name: String): Unit = {
foreachRDD(_.registerTempTable(name))
}
def foreachRDD(func: SchemaRDD => Unit): Unit = {
def executeFunction(dataRDD: RDD[Row], schemaRDD: RDD[StructType]):
RDD[Unit] = {
val schema: StructType = schemaRDD.collect.head
val dataWithSchema: SchemaRDD = sqlc.applySchema(dataRDD, schema)
val result = func(dataWithSchema)
schemaRDD.map(x => result) // return RDD[Unit]
}
dataStream.transformWith(schemaStream, executeFunction
_).foreachRDD(_.count())
}
}
In a similar way you could add a `transform(func: SchemaRDD => SchemaRDD)`
method. But as I said, I am not sure about performance.
Tobias