Hi,

I'm trying to figure out how I can write records that don't match a
json read schema via spark structred streaming to an output sink /
parquet location. Previously I did this in batch via corrupt column
features of batch. But in this spark structured streaming I'm reading
from kafka a string and using from_json on the value of that string.
If it doesn't match my schema then I from_json returns null for all
the rows, and does not populate a corrupt record column. But I want to
somehow obtain the source kafka string in a dataframe, and an write to
a output sink / parquet location.

def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = {
  val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
  jsonDataFrame.select(from_json(col("value"),
schema)).select("jsontostructs(value).*")
}

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to