Re: Filtering JSON records when there isn't an exact schema match in Spark

2023-07-03 Thread Vikas Kumar
Have you tried dropmalformed option ?

On Mon, Jul 3, 2023, 1:34 PM Shashank Rao  wrote:

> Update: Got it working by using the *_corrupt_record *field for the first
> case (record 4)
>
> schema = schema.add("_corrupt_record", DataTypes.StringType);
> Dataset ds = spark.read().schema(schema).option("mode",
> "PERMISSIVE").json("path").collect();
> ds = ds.filter(functions.col("_corrupt_record").isNull()).collect();
>
> However, I haven't figured out on how to ignore record 5.
>
> Any help is appreciated.
>
> On Mon, 3 Jul 2023 at 19:24, Shashank Rao  wrote:
>
>> Hi all,
>> I'm trying to read around 1,000,000 JSONL files present in S3 using
>> Spark. Once read, I need to write them to BigQuery.
>> I have a schema that may not be an exact match with all the records.
>> How can I filter records where there isn't an exact schema match:
>>
>> Eg: if my records were:
>> {"x": 1, "y": 1}
>> {"x": 2, "y": 2}
>> {"x": 3, "y": 3}
>> {"x": 4, "y": "4"}
>> {"x": 5, "y": 5, "z": 5}
>>
>> and if my schema were:
>> root
>>  |-- x: long (nullable = true)
>>  |-- y: long (nullable = true)
>>
>> I need the records 4 and 5 to be filtered out.
>> Record 4 should be filtered out since y is a string instead of long.
>> Record 5 should be filtered out since z is not part of the schema.
>>
>> I tried applying my schema on read, but it does not work as needed:
>>
>> StructType schema = new StructType().add("a",
>> DataTypes.LongType).add("b", DataTypes.LongType);
>> Dataset ds = spark.read().schema(schema).json("path/to/file")
>>
>> This gives me a dataset that has record 4 with y=null and record 5 with x
>> and y.
>>
>> Any help is appreciated.
>>
>> --
>> Thanks,
>> Shashank Rao
>>
>
>
> --
> Regards,
> Shashank Rao
>


Update nested struct with null fields

2023-02-17 Thread Vikas Kumar
Hi,
I have a nested StructType. The StructType is deeply nested and may
comprise other Structs. Now I want to update this struct at the lowest
level.
I tried withField but it doesn't work if any of the top level struct is
null. I will appreciate any help with this.

The example schema is:

val schema = new StructType()
  .add("key", StringType)
  .add(
"cells",
ArrayType(
  new StructType()
.add("family", StringType)
.add("qualifier", StringType)
.add("timestamp", LongType)
.add("nestStruct", new StructType()
.add("id1", LongType)
.add("id2", StringType)
.   .add("id3", new StructType()
   .add("id31", LongType)
   .add("id32", StringType))
)
  )

val data = Seq(
  Row(
"1235321863",
Array(
  Row("a", "b", 1L,  null)
)
  )
)


   val  df_test = spark
  .createDataFrame(spark.sparkContext.parallelize(data), schema)

val result = df_test.withColumn(
  "cell1",
  transform($"cells", cell => {
  cell.withField("nestStruct.id3.id31", lit(40)) // This line doesn't
do anything is nestStruct is null.
  }))
result.show(false)
result.printSchema




Thanks


Re: [Spark Structured Streaming] Do spark structured streaming is support sink to AWS Kinesis currently?

2023-02-16 Thread Vikas Kumar
Doesn't directly answer your question but there are ways in scala and
pyspark - See if this helps:
https://repost.aws/questions/QUP_OJomilTO6oIgvK00VHEA/writing-data-to-kinesis-stream-from-py-spark

On Thu, Feb 16, 2023, 8:27 PM hueiyuan su  wrote:

> *Component*: Spark Structured Streaming
> *Level*: Advanced
> *Scenario*: How-to
>
> 
> *Problems Description*
> I would like to implement witeStream data to AWS Kinesis with Spark
> structured Streaming, but I do not find related connector jar can be used.
> I want to check whether fully support write stream to AWS Kinesis. If you
> have any ideas, please let me know. I will be appreciate it for your answer.
>
> --
> Best Regards,
>
> Mars Su
> *Phone*: 0988-661-013
> *Email*: hueiyua...@gmail.com
>


Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Vikas Kumar
I think these 4 steps should help:

Use zip
Explode
Withcolumn (getelement of array)
Drop the array column

Thanks



On Thu, Feb 16, 2023, 2:18 PM sam smith  wrote:

> @Enrico Minack  I used arrays_zip to merge values
> into one row, and then used toJSON() to export the data.
> @Bjørn explode_outer didn't yield the expected results.
>
> Thanks anyway.
>
> Le jeu. 16 févr. 2023 à 09:06, Enrico Minack  a
> écrit :
>
>> You have to take each row and zip the lists, each element of the result
>> becomes one new row.
>>
>> So turn write a method that turns
>>   Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))
>> into
>>   List(List("A","C","E"), List("B","D","null"),
>> List("null","null","null"))
>> and use flatmap with that method.
>>
>> In Scala, this would read:
>>
>> df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1),
>> row.getSeq[String](2)).zipped.toIterable }.show()
>>
>> Enrico
>>
>>
>> Am 14.02.23 um 22:54 schrieb sam smith:
>>
>> Hello guys,
>>
>> I have the following dataframe:
>>
>> *col1*
>>
>> *col2*
>>
>> *col3*
>>
>> ["A","B","null"]
>>
>> ["C","D","null"]
>>
>> ["E","null","null"]
>>
>>
>> I want to explode it to the following dataframe:
>>
>> *col1*
>>
>> *col2*
>>
>> *col3*
>>
>> "A"
>>
>> "C"
>>
>> "E"
>>
>> "B"
>>
>> "D"
>>
>> "null"
>>
>> "null"
>>
>> "null"
>>
>> "null"
>>
>> How to do that (preferably in Java) using the explode() method ? knowing
>> that something like the following won't yield correct output:
>>
>> for (String colName: dataset.columns())
>> dataset=dataset.withColumn(colName,explode(dataset.col(colName)));
>>
>>
>>
>>


Unsubscribe

2021-07-06 Thread Vikas Kumar
Unsubscribe


Unsubscribe

2020-12-10 Thread Vikas Kumar
Unsubscribe