[ https://issues.apache.org/jira/browse/SPARK-10069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261011#comment-15261011 ]
Davies Liu commented on SPARK-10069: ------------------------------------ cc [~zsxwing] > Python's ReduceByKeyAndWindow DStream Keeps Growing > --------------------------------------------------- > > Key: SPARK-10069 > URL: https://issues.apache.org/jira/browse/SPARK-10069 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.4.1 > Reporter: Asim Jalis > > When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size > of the window keeps growing. I am appending the code that reproduces this > issue. This prints out the count() of the dstream which goes up every batch > by 10 elements. > Is this a bug in the Python version of Scala or is this expected behavior? > Here is the code that reproduces this issue. > {code} > from pyspark import SparkContext > from pyspark.streaming import StreamingContext > from pprint import pprint > print 'Initializing ssc' > ssc = StreamingContext(SparkContext(), batchDuration=1) > ssc.checkpoint('ckpt') > ds = ssc.textFileStream('input') \ > .map(lambda event: (event,1)) \ > .reduceByKeyAndWindow( > func=lambda count1,count2: count1+count2, > invFunc=lambda count1,count2: count1-count2, > windowDuration=10, > slideDuration=2) > ds.pprint() > ds.count().pprint() > print 'Starting ssc' > ssc.start() > import itertools > import time > import random > from distutils import dir_util > def batch_write(batch_data, batch_file_path): > with open(batch_file_path,'w') as batch_file: > for element in batch_data: > line = str(element) + "\n" > batch_file.write(line) > def xrange_write( > batch_size = 5, > batch_dir = 'input', > batch_duration = 1): > '''Every batch_duration write a file with batch_size numbers, > forever. Start at 0 and keep incrementing. Intended for testing > Spark Streaming code.''' > dir_util.mkpath('./input') > for i in itertools.count(): > min = batch_size * i > max = batch_size * (i + 1) > batch_data = xrange(min,max) > file_path = batch_dir + '/' + str(i) > batch_write(batch_data, file_path) > time.sleep(batch_duration) > print 'Feeding data to app' > xrange_write() > > ssc.awaitTermination() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org