Hi,

We've built a batch application on Spark 1.6.1. I'm looking into how to run
the same code as a streaming (DStream based) application. This is using
pyspark.

In the batch application, we have a sequence of transforms that read from
file, do dataframe operations, then write to file. I was hoping to swap out
the read from file with textFileStream, then use the dataframe operations
as is. This would mean that if we change the batch pipeline, so long as it
is a sequence of dataframe operations, the streaming version can just reuse
the code.

Looking at the sql_network_wordcount
<https://github.com/apache/spark/blob/branch-1.6/examples/src/main/python/streaming/sql_network_wordcount.py>
example, it looks like I'd have to do DStream.foreachRDD, convert the
passed in RDD into a dataframe and then do my sequence of dataframe
operations. However, that list of dataframe operations looks to be
hardcoded into the process method, is there any way to pass in a function
that takes a dataframe as input and returns a dataframe?

what i see from the example:

words.foreachRDD(process)

def process(time, rdd):
# create dataframe from RDD
# hardcoded operations on the dataframe

what i would like to do instead:
def process(time, rdd):
# create dataframe from RDD - input_df
# output_df = dataframe_pipeline_fn(input_df)

-ashwin

Reply via email to