Hi, Tathagata I have tried structured streaming, but in line
> Dataset<Row> rowDataset = spark.read().json(jsondataset); Always throw > Queries with streaming sources must be executed with writeStream.start() But what i need to do in this step is only transforming json string data to Dataset . How to fix it? Thanks! Regard, Junfeng Chen On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > It's not very surprising that doing this sort of RDD to DF conversion > inside DStream.foreachRDD has weird corner cases like this. In fact, you > are going to have additional problems with partial parquet files (when > there are failures) in this approach. I strongly suggest that you use > Structured Streaming, which is designed to do this sort of processing. It > will take care of tracking the written parquet files correctly. > > TD > > On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen <darou...@gmail.com> wrote: > >> I write a program to read some json data from kafka and purpose to save >> them to parquet file on hdfs. >> Here is my code: >> >>> JavaInputDstream stream = ... >>> JavaDstream rdd = stream.map... >>> rdd.repartition(taksNum).foreachRDD(VoldFunction<JavaRDD<String> >>> stringjavardd->{ >>> Dataset<Row> df = spark.read().json( stringjavardd ); // convert >>> json to df >>> JavaRDD<Row> rowJavaRDD = df.javaRDD().map... //add some new fields >>> StructType type = df.schema()...; // constuct new type for new added >>> fields >>> Dataset<Row) newdf = spark.createDataFrame(rowJavaRDD.type); >>> //create new dataframe >>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati >>> tionedBy("appname").parquet(savepath); // save to parquet >>> }) >> >> >> >> However, if I remove the repartition method of newdf in writing parquet >> stage, the program always throw nullpointerexception error in json convert >> line: >> >> Java.lang.NullPointerException >>> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. >>> scala:1783) >>> ... >> >> >> While it looks make no sense, writing parquet operation should be in >> different stage with json transforming operation. >> So how to solve it? Thanks! >> >> Regard, >> Junfeng Chen >> > >