Hello, I'm trying to calculate the Pearson correlation between two DStreams using sliding window in Pyspark. But I keep getting the following error:
Traceback (most recent call last): File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/examples/src/main/python/streaming/Cross-Corr.py", line 63, in <module> result = Statistics.corr(windowedds1,windowedds2, method="pearson") File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/stat/_statistics.py", line 157, in corr File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 130, in callMLlibFunc File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 122, in callJavaFunc File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 87, in _py2java File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 555, in dumps File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/context.py", line 315, in __getnewargs__ Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. The error comes from these this line: result = Statistics.corr(windowedds1,windowedds2, method="pearson") First, I read the lines from 2 text files and load them into two Kafka topics and then apply the window operation on each DStream and calculate Pearson correlation between them. Here is my code: from __future__ import print_function from future.builtins import * from pyspark.ml.linalg import Vectors from pyspark.mllib.stat import Statistics from pyspark.ml.stat import Correlation from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import time from collections import deque import sys from operator import add import numpy as np from itertools import chain import warnings from obspy import UTCDateTime from obspy.signal.cross_correlation import templates_max_similarity from obspy import read if __name__ == "__main__": print("hello spark") sc = SparkContext("local[2]", appName="CrossCorrelation") ssc = StreamingContext(sc, 5) broker, topic1, topic2 = sys.argv[1:] # Connect to Kafka kvs1 = KafkaUtils.createStream(ssc, broker, "real-time-cross-correlation",{topic1:1}) kvs2 = KafkaUtils.createStream(ssc, broker, "real-time-cross-correlation",{topic2:1}) lines1 = kvs1.map(lambda x1: x1[1]) ds1 = lines1.flatMap(lambda line1: line1.strip().split("\n")).map(lambda strelem1: float(strelem1)) lines2 = kvs2.map(lambda x2: x2[1]) ds2 = lines2.flatMap(lambda line2: line2.strip().split("\n")).map(lambda strelem2: float(strelem2)) #Windowing windowedds1= ds1.window(10,5) windowedds2= ds2.window(10,5) #Correlation result = Statistics.corr(windowedds1,windowedds2, method="pearson") if result > 0.7: print("ds1 and ds2 are correlated!!!") ssc.start() ssc.awaitTermination() Does anybody know what I'm doing wrong? Thank you. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org