I write a program to read some json data from kafka and purpose to save them to parquet file on hdfs. Here is my code:
> JavaInputDstream stream = ... > JavaDstream rdd = stream.map... > rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> > stringjavardd->{ > Dataset<Row> df = spark.read().json( stringjavardd ); // convert json > to df > JavaRDD<Row> rowJavaRDD = df.javaRDD().map... //add some new fields > StructType type = df.schema()...; // constuct new type for new added > fields > Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create > new dataframe > > newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath); > // save to parquet > }) However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line: Java.lang.NullPointerException > at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783) > ... While it looks make no sense, writing parquet operation should be in different stage with json transforming operation. So how to solve it? Thanks! Regard, Junfeng Chen