I write a program to read some json data from kafka and purpose to save
them to parquet file on hdfs.
Here is my code:
> JavaInputDstream stream = ...
> JavaDstream rdd = stream.map...
> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String>
> stringjavardd->{
> Dataset<Row> df = spark.read().json( stringjavardd ); // convert json
> to df
> JavaRDD<Row> rowJavaRDD = df.javaRDD().map... //add some new fields
> StructType type = df.schema()...; // constuct new type for new added
> fields
> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create
> new dataframe
>
> newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath);
> // save to parquet
> })
However, if I remove the repartition method of newdf in writing parquet
stage, the program always throw nullpointerexception error in json convert
line:
Java.lang.NullPointerException
> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
> ...
While it looks make no sense, writing parquet operation should be in
different stage with json transforming operation.
So how to solve it? Thanks!
Regard,
Junfeng Chen