Hi,
I am writing a Spark program in Python to find the maximum temperature for
a year, given a weather dataset. The below program throws an error when I
try to execute the Spark program.
TypeError: 'NoneType' object is not iterable
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
If I replace +9999 with 9999 in the extractData function, the program
executes without any error. The code with +9999 works in the Hadoop
streaming, but not with Spark pipes. How to get around the problem? Has to
do it with the way encoding is done within Spark?
-------------
import re
import sys
from pyspark import SparkContext
#Create Spark Context with the master details and the application name
sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
#define the accumulator
invalidRecordsAccumulator = sc.accumulator(0)
validRecordsAccumulator = sc.accumulator(0)
logFile = "hdfs://localhost:9000/user/bigdatavm/input"
#Create an RDD from the input data in HDFS
weatherData = sc.textFile(logFile)
#function to extract the data from the line
#based on position and filter out the invalid records
*def extractData(line): global invalidRecordsAccumulator val =
line.strip() (year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
validRecordsAccumulator += 1 return (year, temp) else:
invalidRecordsAccumulator += 1*
#Transform the data to extract/filter and then find the max temperature
max_temperature_per_year = weatherData.map(extractData).reduceByKey(lambda
a,b : a if int(a) > int(b) else b)
#Save the RDD back into HDFS
max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")
print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value
print "----->>>>>> Invalid records = %d" % invalidRecordsAccumulator.value
Thanks,
Praveen