Re: Corrupt record handling in spark structured streaming and from_json function
https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming On Wed, Dec 26, 2018 at 2:42 PM Colin Williams wrote: > > From my initial impression it looks like I'd need to create my own > `from_json` using `jsonToStructs` as a reference but try to handle ` > case : BadRecordException => null ` or similar to try to write the non > matching string to a corrupt records column > > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams > wrote: > > > > Hi, > > > > I'm trying to figure out how I can write records that don't match a > > json read schema via spark structred streaming to an output sink / > > parquet location. Previously I did this in batch via corrupt column > > features of batch. But in this spark structured streaming I'm reading > > from kafka a string and using from_json on the value of that string. > > If it doesn't match my schema then I from_json returns null for all > > the rows, and does not populate a corrupt record column. But I want to > > somehow obtain the source kafka string in a dataframe, and an write to > > a output sink / parquet location. > > > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: > > StructType) = { > > val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) > > jsonDataFrame.select(from_json(col("value"), > > schema)).select("jsontostructs(value).*") > > } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Corrupt record handling in spark structured streaming and from_json function
>From my initial impression it looks like I'd need to create my own `from_json` using `jsonToStructs` as a reference but try to handle ` case : BadRecordException => null ` or similar to try to write the non matching string to a corrupt records column On Wed, Dec 26, 2018 at 1:55 PM Colin Williams wrote: > > Hi, > > I'm trying to figure out how I can write records that don't match a > json read schema via spark structred streaming to an output sink / > parquet location. Previously I did this in batch via corrupt column > features of batch. But in this spark structured streaming I'm reading > from kafka a string and using from_json on the value of that string. > If it doesn't match my schema then I from_json returns null for all > the rows, and does not populate a corrupt record column. But I want to > somehow obtain the source kafka string in a dataframe, and an write to > a output sink / parquet location. > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) > = { > val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) > jsonDataFrame.select(from_json(col("value"), > schema)).select("jsontostructs(value).*") > } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Corrupt record handling in spark structured streaming and from_json function
Hi, I'm trying to figure out how I can write records that don't match a json read schema via spark structred streaming to an output sink / parquet location. Previously I did this in batch via corrupt column features of batch. But in this spark structured streaming I'm reading from kafka a string and using from_json on the value of that string. If it doesn't match my schema then I from_json returns null for all the rows, and does not populate a corrupt record column. But I want to somehow obtain the source kafka string in a dataframe, and an write to a output sink / parquet location. def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = { val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string")) jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*") } - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to clean up logs-dirs and local-dirs of running spark streaming in yarn cluster mode
Hi Fawze, Thank you for the link. But that is exactly what I am doing. I think this is related to yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage setting. When the disk utilization exceeds this setting, the node is marked unhealthy. Other than increasing the default 90%, is there anything else I can do? Thanks -Shyla On Tue, Dec 25, 2018 at 7:26 PM Fawze Abujaber wrote: > http://shzhangji.com/blog/2015/05/31/spark-streaming-logging-configuration/ > > On Wed, Dec 26, 2018 at 1:05 AM shyla deshpande > wrote: > >> Please point me to any documentation if available. Thanks >> >> On Tue, Dec 18, 2018 at 11:10 AM shyla deshpande < >> deshpandesh...@gmail.com> wrote: >> >>> Is there a way to do this without stopping the streaming application in >>> yarn cluster mode? >>> >>> On Mon, Dec 17, 2018 at 4:42 PM shyla deshpande < >>> deshpandesh...@gmail.com> wrote: >>> I get the ERROR 1/1 local-dirs are bad: /mnt/yarn; 1/1 log-dirs are bad: /var/log/hadoop-yarn/containers Is there a way to clean up these directories while the spark streaming application is running? Thanks >>> > > -- > Take Care > Fawze Abujaber >
Re: Packaging kafka certificates in uber jar
Hi thanks. This is part of the solution I found after writing the question. The other part being is that I needed to write the input stream to a temporary file. I would prefer not to write any temporary file but the ssl.keystore.location properties seems to expect a file path. On Tue, Dec 25, 2018 at 5:26 AM Anastasios Zouzias wrote: > > Hi Colin, > > You can place your certificates under src/main/resources and include them in > the uber JAR, see e.g. : > https://stackoverflow.com/questions/40252652/access-files-in-resources-directory-in-jar-from-apache-spark-streaming-context > > Best, > Anastasios > > On Mon, Dec 24, 2018 at 10:29 PM Colin Williams > wrote: >> >> I've been trying to read from kafka via a spark streaming client. I >> found out spark cluster doesn't have certificates deployed. Then I >> tried using the same local certificates I've been testing with by >> packing them in an uber jar and getting a File handle from the >> Classloader resource. But I'm getting a File Not Found exception. >> These are jks certificates. Is anybody aware how to package >> certificates in a jar with a kafka client preferably the spark one? >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > > > -- > -- Anastasios Zouzias - To unsubscribe e-mail: user-unsubscr...@spark.apache.org