There is a DStream.transform() that does exactly this.

On Tue, Aug 8, 2017 at 7:55 PM, Ashwin Raju <ther...@gmail.com> wrote:

> Hi,
>
> We've built a batch application on Spark 1.6.1. I'm looking into how to
> run the same code as a streaming (DStream based) application. This is using
> pyspark.
>
> In the batch application, we have a sequence of transforms that read from
> file, do dataframe operations, then write to file. I was hoping to swap out
> the read from file with textFileStream, then use the dataframe operations
> as is. This would mean that if we change the batch pipeline, so long as it
> is a sequence of dataframe operations, the streaming version can just reuse
> the code.
>
> Looking at the sql_network_wordcount
> <https://github.com/apache/spark/blob/branch-1.6/examples/src/main/python/streaming/sql_network_wordcount.py>
> example, it looks like I'd have to do DStream.foreachRDD, convert the
> passed in RDD into a dataframe and then do my sequence of dataframe
> operations. However, that list of dataframe operations looks to be
> hardcoded into the process method, is there any way to pass in a function
> that takes a dataframe as input and returns a dataframe?
>
> what i see from the example:
>
> words.foreachRDD(process)
>
> def process(time, rdd):
> # create dataframe from RDD
> # hardcoded operations on the dataframe
>
> what i would like to do instead:
> def process(time, rdd):
> # create dataframe from RDD - input_df
> # output_df = dataframe_pipeline_fn(input_df)
>
> -ashwin
>
>
>
>

Reply via email to