Re: Filtering JSON records when there isn't an exact schema match in Spark
Have you tried dropmalformed option ? On Mon, Jul 3, 2023, 1:34 PM Shashank Rao wrote: > Update: Got it working by using the *_corrupt_record *field for the first > case (record 4) > > schema = schema.add("_corrupt_record", DataTypes.StringType); > Dataset ds = spark.read().schema(schema).option("mode", > "PERMISSIVE").json("path").collect(); > ds = ds.filter(functions.col("_corrupt_record").isNull()).collect(); > > However, I haven't figured out on how to ignore record 5. > > Any help is appreciated. > > On Mon, 3 Jul 2023 at 19:24, Shashank Rao wrote: > >> Hi all, >> I'm trying to read around 1,000,000 JSONL files present in S3 using >> Spark. Once read, I need to write them to BigQuery. >> I have a schema that may not be an exact match with all the records. >> How can I filter records where there isn't an exact schema match: >> >> Eg: if my records were: >> {"x": 1, "y": 1} >> {"x": 2, "y": 2} >> {"x": 3, "y": 3} >> {"x": 4, "y": "4"} >> {"x": 5, "y": 5, "z": 5} >> >> and if my schema were: >> root >> |-- x: long (nullable = true) >> |-- y: long (nullable = true) >> >> I need the records 4 and 5 to be filtered out. >> Record 4 should be filtered out since y is a string instead of long. >> Record 5 should be filtered out since z is not part of the schema. >> >> I tried applying my schema on read, but it does not work as needed: >> >> StructType schema = new StructType().add("a", >> DataTypes.LongType).add("b", DataTypes.LongType); >> Dataset ds = spark.read().schema(schema).json("path/to/file") >> >> This gives me a dataset that has record 4 with y=null and record 5 with x >> and y. >> >> Any help is appreciated. >> >> -- >> Thanks, >> Shashank Rao >> > > > -- > Regards, > Shashank Rao >
Update nested struct with null fields
Hi, I have a nested StructType. The StructType is deeply nested and may comprise other Structs. Now I want to update this struct at the lowest level. I tried withField but it doesn't work if any of the top level struct is null. I will appreciate any help with this. The example schema is: val schema = new StructType() .add("key", StringType) .add( "cells", ArrayType( new StructType() .add("family", StringType) .add("qualifier", StringType) .add("timestamp", LongType) .add("nestStruct", new StructType() .add("id1", LongType) .add("id2", StringType) . .add("id3", new StructType() .add("id31", LongType) .add("id32", StringType)) ) ) val data = Seq( Row( "1235321863", Array( Row("a", "b", 1L, null) ) ) ) val df_test = spark .createDataFrame(spark.sparkContext.parallelize(data), schema) val result = df_test.withColumn( "cell1", transform($"cells", cell => { cell.withField("nestStruct.id3.id31", lit(40)) // This line doesn't do anything is nestStruct is null. })) result.show(false) result.printSchema Thanks
Re: [Spark Structured Streaming] Do spark structured streaming is support sink to AWS Kinesis currently?
Doesn't directly answer your question but there are ways in scala and pyspark - See if this helps: https://repost.aws/questions/QUP_OJomilTO6oIgvK00VHEA/writing-data-to-kinesis-stream-from-py-spark On Thu, Feb 16, 2023, 8:27 PM hueiyuan su wrote: > *Component*: Spark Structured Streaming > *Level*: Advanced > *Scenario*: How-to > > > *Problems Description* > I would like to implement witeStream data to AWS Kinesis with Spark > structured Streaming, but I do not find related connector jar can be used. > I want to check whether fully support write stream to AWS Kinesis. If you > have any ideas, please let me know. I will be appreciate it for your answer. > > -- > Best Regards, > > Mars Su > *Phone*: 0988-661-013 > *Email*: hueiyua...@gmail.com >
Re: How to explode array columns of a dataframe having the same length
I think these 4 steps should help: Use zip Explode Withcolumn (getelement of array) Drop the array column Thanks On Thu, Feb 16, 2023, 2:18 PM sam smith wrote: > @Enrico Minack I used arrays_zip to merge values > into one row, and then used toJSON() to export the data. > @Bjørn explode_outer didn't yield the expected results. > > Thanks anyway. > > Le jeu. 16 févr. 2023 à 09:06, Enrico Minack a > écrit : > >> You have to take each row and zip the lists, each element of the result >> becomes one new row. >> >> So turn write a method that turns >> Row(List("A","B","null"), List("C","D","null"), List("E","null","null")) >> into >> List(List("A","C","E"), List("B","D","null"), >> List("null","null","null")) >> and use flatmap with that method. >> >> In Scala, this would read: >> >> df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1), >> row.getSeq[String](2)).zipped.toIterable }.show() >> >> Enrico >> >> >> Am 14.02.23 um 22:54 schrieb sam smith: >> >> Hello guys, >> >> I have the following dataframe: >> >> *col1* >> >> *col2* >> >> *col3* >> >> ["A","B","null"] >> >> ["C","D","null"] >> >> ["E","null","null"] >> >> >> I want to explode it to the following dataframe: >> >> *col1* >> >> *col2* >> >> *col3* >> >> "A" >> >> "C" >> >> "E" >> >> "B" >> >> "D" >> >> "null" >> >> "null" >> >> "null" >> >> "null" >> >> How to do that (preferably in Java) using the explode() method ? knowing >> that something like the following won't yield correct output: >> >> for (String colName: dataset.columns()) >> dataset=dataset.withColumn(colName,explode(dataset.col(colName))); >> >> >> >>
Unsubscribe
Unsubscribe
Unsubscribe
Unsubscribe