Re: Corrupt record handling in spark structured streaming and from_json function
Dear spark user community, I have recieved some insight regarding filtering seperate dataframes in my spark-structured-streaming job. However I wish to write the dataframes aforementioned above in the stack overflow question each using a parquet writer to a separate location. My initial impression is this requires multiple sinks, but I'm being pressured against that. I think it might also be possible using the for each / for each batch writers. But I'm not sure regarding parquet writer, and also the caveats to this approach. Can some more advanced users or developers suggest how to go about this, particularly without using multiple streams? On Wed, Dec 26, 2018 at 6:01 PM Colin Williams wrote: > > https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming > > On Wed, Dec 26, 2018 at 2:42 PM Colin Williams > wrote: > > > > From my initial impression it looks like I'd need to create my own > > `from_json` using `jsonToStructs` as a reference but try to handle ` > > case : BadRecordException => null ` or similar to try to write the non > > matching string to a corrupt records column > > > > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams > > wrote: > > > > > > Hi, > > > > > > I'm trying to figure out how I can write records that don't match a > > > json read schema via spark structred streaming to an output sink / > > > parquet location. Previously I did this in batch via corrupt column > > > features of batch. But in this spark structured streaming I'm reading > > > from kafka a string and using from_json on the value of that string. > > > If it doesn't match my schema then I from_json returns null for all > > > the rows, and does not populate a corrupt record column. But I want to > > > somehow obtain the source kafka string in a dataframe, and an write to > > > a output sink / parquet location. > > > > > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: > > > StructType) = { > > > val jsonDataFrame = > > > rawKafkaDataFrame.select(col("value").cast("string")) > > > jsonDataFrame.select(from_json(col("value"), > > > schema)).select("jsontostructs(value).*") > > > } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Corrupt record handling in spark structured streaming and from_json function
https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming On Wed, Dec 26, 2018 at 2:42 PM Colin Williams wrote: > > From my initial impression it looks like I'd need to create my own > `from_json` using `jsonToStructs` as a reference but try to handle ` > case : BadRecordException => null ` or similar to try to write the non > matching string to a corrupt records column > > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams > wrote: > > > > Hi, > > > > I'm trying to figure out how I can write records that don't match a > > json read schema via spark structred streaming to an output sink / > > parquet location. Previously I did this in batch via corrupt column > > features of batch. But in this spark structured streaming I'm reading > > from kafka a string and using from_json on the value of that string. > > If it doesn't match my schema then I from_json returns null for all > > the rows, and does not populate a corrupt record column. But I want to > > somehow obtain the source kafka string in a dataframe, and an write to > > a output sink / parquet location. > > > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: > > StructType) = { > > val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) > > jsonDataFrame.select(from_json(col("value"), > > schema)).select("jsontostructs(value).*") > > } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Corrupt record handling in spark structured streaming and from_json function
>From my initial impression it looks like I'd need to create my own `from_json` using `jsonToStructs` as a reference but try to handle ` case : BadRecordException => null ` or similar to try to write the non matching string to a corrupt records column On Wed, Dec 26, 2018 at 1:55 PM Colin Williams wrote: > > Hi, > > I'm trying to figure out how I can write records that don't match a > json read schema via spark structred streaming to an output sink / > parquet location. Previously I did this in batch via corrupt column > features of batch. But in this spark structured streaming I'm reading > from kafka a string and using from_json on the value of that string. > If it doesn't match my schema then I from_json returns null for all > the rows, and does not populate a corrupt record column. But I want to > somehow obtain the source kafka string in a dataframe, and an write to > a output sink / parquet location. > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) > = { > val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) > jsonDataFrame.select(from_json(col("value"), > schema)).select("jsontostructs(value).*") > } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Corrupt record handling in spark structured streaming and from_json function
Hi, I'm trying to figure out how I can write records that don't match a json read schema via spark structred streaming to an output sink / parquet location. Previously I did this in batch via corrupt column features of batch. But in this spark structured streaming I'm reading from kafka a string and using from_json on the value of that string. If it doesn't match my schema then I from_json returns null for all the rows, and does not populate a corrupt record column. But I want to somehow obtain the source kafka string in a dataframe, and an write to a output sink / parquet location. def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = { val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*") } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org