I write a program to read some json data from kafka and purpose to save
them to parquet file on hdfs.
Here is my code:

> JavaInputDstream stream = ...
> JavaDstream rdd = stream.map...
> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String>
> stringjavardd->{
>     Dataset<Row> df = spark.read().json( stringjavardd ); // convert json
> to df
>     JavaRDD<Row> rowJavaRDD = df.javaRDD().map...  //add some new fields
>     StructType type = df.schema()...; // constuct new type for new added
> fields
>     Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); //create
> new dataframe
>
> newdf.repatition(taskNum).write().mode(SaveMode.Append).patitionedBy("appname").parquet(savepath);
> // save to parquet
> })



However, if I remove the repartition method of newdf in writing parquet
stage, the program always throw nullpointerexception error in json convert
line:

Java.lang.NullPointerException
>  at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1783)
> ...


While it looks make no sense, writing parquet operation should be in
different stage with json transforming operation.
So how to solve it? Thanks!

Regard,
Junfeng Chen

Reply via email to