When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size
of the window keeps growing. I am appending the code that reproduces this
issue. This prints out the count() of the dstream which goes up every batch
by 10 elements.

Is this a bug in the Python version of Scala or is this expected behavior?

Here is the code that reproduces this issue.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pprint import pprint

print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1)
ssc.checkpoint('ckpt')

ds = ssc.textFileStream('input') \
    .map(lambda event: (event,1)) \
    .reduceByKeyAndWindow(
        func=lambda count1,count2: count1+count2,
        invFunc=lambda count1,count2: count1-count2,
        windowDuration=10,
        slideDuration=2)

ds.pprint()
ds.count().pprint()

print 'Starting ssc'
ssc.start()

import itertools
import time
import random

from distutils import dir_util

def batch_write(batch_data, batch_file_path):
    with open(batch_file_path,'w') as batch_file:
        for element in batch_data:
            line = str(element) + "\n"
            batch_file.write(line)

def xrange_write(
        batch_size = 5,
        batch_dir = 'input',
        batch_duration = 1):
    '''Every batch_duration write a file with batch_size numbers,
    forever. Start at 0 and keep incrementing. Intended for testing
    Spark Streaming code.'''

    dir_util.mkpath('./input')
    for i in itertools.count():
        min = batch_size * i
        max = batch_size * (i + 1)
        batch_data = xrange(min,max)
        file_path = batch_dir + '/' + str(i)
        batch_write(batch_data, file_path)
        time.sleep(batch_duration)

print 'Feeding data to app'
xrange_write()

ssc.awaitTermination()

Reply via email to