>From my initial impression it looks like I'd need to create my own `from_json` using `jsonToStructs` as a reference but try to handle ` case : BadRecordException => null ` or similar to try to write the non matching string to a corrupt records column
On Wed, Dec 26, 2018 at 1:55 PM Colin Williams <colin.williams.seat...@gmail.com> wrote: > > Hi, > > I'm trying to figure out how I can write records that don't match a > json read schema via spark structred streaming to an output sink / > parquet location. Previously I did this in batch via corrupt column > features of batch. But in this spark structured streaming I'm reading > from kafka a string and using from_json on the value of that string. > If it doesn't match my schema then I from_json returns null for all > the rows, and does not populate a corrupt record column. But I want to > somehow obtain the source kafka string in a dataframe, and an write to > a output sink / parquet location. > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) > = { > val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) > jsonDataFrame.select(from_json(col("value"), > schema)).select("jsontostructs(value).*") > } --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org