https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming

On Wed, Dec 26, 2018 at 2:42 PM Colin Williams
<colin.williams.seat...@gmail.com> wrote:
>
> From my initial impression it looks like I'd need to create my own
> `from_json` using `jsonToStructs` as a reference but try to handle `
> case : BadRecordException => null ` or similar to try to write the non
> matching string to a corrupt records column
>
> On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
> <colin.williams.seat...@gmail.com> wrote:
> >
> > Hi,
> >
> > I'm trying to figure out how I can write records that don't match a
> > json read schema via spark structred streaming to an output sink /
> > parquet location. Previously I did this in batch via corrupt column
> > features of batch. But in this spark structured streaming I'm reading
> > from kafka a string and using from_json on the value of that string.
> > If it doesn't match my schema then I from_json returns null for all
> > the rows, and does not populate a corrupt record column. But I want to
> > somehow obtain the source kafka string in a dataframe, and an write to
> > a output sink / parquet location.
> >
> > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: 
> > StructType) = {
> >   val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
> >   jsonDataFrame.select(from_json(col("value"),
> > schema)).select("jsontostructs(value).*")
> > }

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to