Re: Filtering JSON records when there isn't an exact schema match in Spark

2023-07-03 Thread Vikas Kumar
Have you tried dropmalformed option ?

On Mon, Jul 3, 2023, 1:34 PM Shashank Rao  wrote:

> Update: Got it working by using the *_corrupt_record *field for the first
> case (record 4)
>
> schema = schema.add("_corrupt_record", DataTypes.StringType);
> Dataset ds = spark.read().schema(schema).option("mode",
> "PERMISSIVE").json("path").collect();
> ds = ds.filter(functions.col("_corrupt_record").isNull()).collect();
>
> However, I haven't figured out on how to ignore record 5.
>
> Any help is appreciated.
>
> On Mon, 3 Jul 2023 at 19:24, Shashank Rao  wrote:
>
>> Hi all,
>> I'm trying to read around 1,000,000 JSONL files present in S3 using
>> Spark. Once read, I need to write them to BigQuery.
>> I have a schema that may not be an exact match with all the records.
>> How can I filter records where there isn't an exact schema match:
>>
>> Eg: if my records were:
>> {"x": 1, "y": 1}
>> {"x": 2, "y": 2}
>> {"x": 3, "y": 3}
>> {"x": 4, "y": "4"}
>> {"x": 5, "y": 5, "z": 5}
>>
>> and if my schema were:
>> root
>>  |-- x: long (nullable = true)
>>  |-- y: long (nullable = true)
>>
>> I need the records 4 and 5 to be filtered out.
>> Record 4 should be filtered out since y is a string instead of long.
>> Record 5 should be filtered out since z is not part of the schema.
>>
>> I tried applying my schema on read, but it does not work as needed:
>>
>> StructType schema = new StructType().add("a",
>> DataTypes.LongType).add("b", DataTypes.LongType);
>> Dataset ds = spark.read().schema(schema).json("path/to/file")
>>
>> This gives me a dataset that has record 4 with y=null and record 5 with x
>> and y.
>>
>> Any help is appreciated.
>>
>> --
>> Thanks,
>> Shashank Rao
>>
>
>
> --
> Regards,
> Shashank Rao
>


Re: Introducing English SDK for Apache Spark - Seeking Your Feedback and Contributions

2023-07-03 Thread Gavin Ray
Wow, really neat -- thanks for sharing!

On Mon, Jul 3, 2023 at 8:12 PM Gengliang Wang  wrote:

> Dear Apache Spark community,
>
> We are delighted to announce the launch of a groundbreaking tool that aims
> to make Apache Spark more user-friendly and accessible - the English SDK
> . Powered by the
> application of Generative AI, the English SDK
>  allows you to execute
> complex tasks with simple English instructions. This exciting news was 
> announced
> recently at the Data+AI Summit
>  and also introduced
> through a detailed blog post
> 
> .
>
> Now, we need your invaluable feedback and contributions. The aim of the
> English SDK is not only to simplify and enrich your Apache Spark experience
> but also to grow with the community. We're calling upon Spark developers
> and users to explore this innovative tool, offer your insights, provide
> feedback, and contribute to its evolution.
>
> You can find more details about the SDK and usage examples on the GitHub
> repository https://github.com/databrickslabs/pyspark-ai/. If you have any
> feedback or suggestions, please feel free to open an issue directly on the
> repository. We are actively monitoring the issues and value your insights.
>
> We also welcome pull requests and are eager to see how you might extend or
> refine this tool. Let's come together to continue making Apache Spark more
> approachable and user-friendly.
>
> Thank you in advance for your attention and involvement. We look forward
> to hearing your thoughts and seeing your contributions!
>
> Best,
> Gengliang Wang
>


Re: Introducing English SDK for Apache Spark - Seeking Your Feedback and Contributions

2023-07-03 Thread Hyukjin Kwon
The demo was really amazing.

On Tue, 4 Jul 2023 at 09:17, Farshid Ashouri 
wrote:

> This is wonderful news!
>
> On Tue, 4 Jul 2023 at 01:14, Gengliang Wang  wrote:
>
>> Dear Apache Spark community,
>>
>> We are delighted to announce the launch of a groundbreaking tool that
>> aims to make Apache Spark more user-friendly and accessible - the
>> English SDK . Powered by
>> the application of Generative AI, the English SDK
>>  allows you to execute
>> complex tasks with simple English instructions. This exciting news was 
>> announced
>> recently at the Data+AI Summit
>>  and also introduced
>> through a detailed blog post
>> 
>> .
>>
>> Now, we need your invaluable feedback and contributions. The aim of the
>> English SDK is not only to simplify and enrich your Apache Spark experience
>> but also to grow with the community. We're calling upon Spark developers
>> and users to explore this innovative tool, offer your insights, provide
>> feedback, and contribute to its evolution.
>>
>> You can find more details about the SDK and usage examples on the GitHub
>> repository https://github.com/databrickslabs/pyspark-ai/. If you have
>> any feedback or suggestions, please feel free to open an issue directly on
>> the repository. We are actively monitoring the issues and value your
>> insights.
>>
>> We also welcome pull requests and are eager to see how you might extend
>> or refine this tool. Let's come together to continue making Apache Spark
>> more approachable and user-friendly.
>>
>> Thank you in advance for your attention and involvement. We look forward
>> to hearing your thoughts and seeing your contributions!
>>
>> Best,
>> Gengliang Wang
>>
> --
>
>
> *Farshid Ashouri*,
> Senior Vice President,
> J.P. Morgan & Chase Co.
> +44 7932 650 788
>
>


Re: Introducing English SDK for Apache Spark - Seeking Your Feedback and Contributions

2023-07-03 Thread Farshid Ashouri
This is wonderful news!

On Tue, 4 Jul 2023 at 01:14, Gengliang Wang  wrote:

> Dear Apache Spark community,
>
> We are delighted to announce the launch of a groundbreaking tool that aims
> to make Apache Spark more user-friendly and accessible - the English SDK
> . Powered by the
> application of Generative AI, the English SDK
>  allows you to execute
> complex tasks with simple English instructions. This exciting news was 
> announced
> recently at the Data+AI Summit
>  and also introduced
> through a detailed blog post
> 
> .
>
> Now, we need your invaluable feedback and contributions. The aim of the
> English SDK is not only to simplify and enrich your Apache Spark experience
> but also to grow with the community. We're calling upon Spark developers
> and users to explore this innovative tool, offer your insights, provide
> feedback, and contribute to its evolution.
>
> You can find more details about the SDK and usage examples on the GitHub
> repository https://github.com/databrickslabs/pyspark-ai/. If you have any
> feedback or suggestions, please feel free to open an issue directly on the
> repository. We are actively monitoring the issues and value your insights.
>
> We also welcome pull requests and are eager to see how you might extend or
> refine this tool. Let's come together to continue making Apache Spark more
> approachable and user-friendly.
>
> Thank you in advance for your attention and involvement. We look forward
> to hearing your thoughts and seeing your contributions!
>
> Best,
> Gengliang Wang
>
-- 


*Farshid Ashouri*,
Senior Vice President,
J.P. Morgan & Chase Co.
+44 7932 650 788


Introducing English SDK for Apache Spark - Seeking Your Feedback and Contributions

2023-07-03 Thread Gengliang Wang
Dear Apache Spark community,

We are delighted to announce the launch of a groundbreaking tool that aims
to make Apache Spark more user-friendly and accessible - the English SDK
. Powered by the application
of Generative AI, the English SDK
 allows you to execute
complex tasks with simple English instructions. This exciting news was
announced
recently at the Data+AI Summit
 and also introduced
through a detailed blog post

.

Now, we need your invaluable feedback and contributions. The aim of the
English SDK is not only to simplify and enrich your Apache Spark experience
but also to grow with the community. We're calling upon Spark developers
and users to explore this innovative tool, offer your insights, provide
feedback, and contribute to its evolution.

You can find more details about the SDK and usage examples on the GitHub
repository https://github.com/databrickslabs/pyspark-ai/. If you have any
feedback or suggestions, please feel free to open an issue directly on the
repository. We are actively monitoring the issues and value your insights.

We also welcome pull requests and are eager to see how you might extend or
refine this tool. Let's come together to continue making Apache Spark more
approachable and user-friendly.

Thank you in advance for your attention and involvement. We look forward to
hearing your thoughts and seeing your contributions!

Best,
Gengliang Wang


Re: Filtering JSON records when there isn't an exact schema match in Spark

2023-07-03 Thread Shashank Rao
Update: Got it working by using the *_corrupt_record *field for the first
case (record 4)

schema = schema.add("_corrupt_record", DataTypes.StringType);
Dataset ds = spark.read().schema(schema).option("mode",
"PERMISSIVE").json("path").collect();
ds = ds.filter(functions.col("_corrupt_record").isNull()).collect();

However, I haven't figured out on how to ignore record 5.

Any help is appreciated.

On Mon, 3 Jul 2023 at 19:24, Shashank Rao  wrote:

> Hi all,
> I'm trying to read around 1,000,000 JSONL files present in S3 using Spark.
> Once read, I need to write them to BigQuery.
> I have a schema that may not be an exact match with all the records.
> How can I filter records where there isn't an exact schema match:
>
> Eg: if my records were:
> {"x": 1, "y": 1}
> {"x": 2, "y": 2}
> {"x": 3, "y": 3}
> {"x": 4, "y": "4"}
> {"x": 5, "y": 5, "z": 5}
>
> and if my schema were:
> root
>  |-- x: long (nullable = true)
>  |-- y: long (nullable = true)
>
> I need the records 4 and 5 to be filtered out.
> Record 4 should be filtered out since y is a string instead of long.
> Record 5 should be filtered out since z is not part of the schema.
>
> I tried applying my schema on read, but it does not work as needed:
>
> StructType schema = new StructType().add("a", DataTypes.LongType).add("b",
> DataTypes.LongType);
> Dataset ds = spark.read().schema(schema).json("path/to/file")
>
> This gives me a dataset that has record 4 with y=null and record 5 with x
> and y.
>
> Any help is appreciated.
>
> --
> Thanks,
> Shashank Rao
>


-- 
Regards,
Shashank Rao


Re: [Spark SQL] Data objects from query history

2023-07-03 Thread Jack Wells
 Hi Ruben,

I’m not sure if this answers your question, but if you’re interested in
exploring the underlying tables, you could always try something like the
below in a Databricks notebook:

display(spark.read.table(’samples.nyctaxi.trips’))

(For vanilla Spark users, it would be
spark.read.table(’samples.nyctaxi.trips’).show(100, False) )

Since you’re using Databricks, you can also find the data under the Data
menu, scroll down to the samples metastore then click through to trips to
find the file location, schema, and sample data.

On Jun 29, 2023 at 23:53:25, Ruben Mennes  wrote:

> Dear Apache Spark community,
>
> I hope this email finds you well. My name is Ruben, and I am an
> enthusiastic user of Apache Spark, specifically through the Databricks
> platform. I am reaching out to you today to seek your assistance and
> guidance regarding a specific use case.
>
> I have been exploring the capabilities of Spark SQL and Databricks, and I
> have encountered a challenge related to accessing the data objects used by
> queries from the query history. I am aware that Databricks provides a
> comprehensive query history that contains valuable information about
> executed queries.
>
> However, my objective is to extract the underlying data objects (tables)
> involved in each query. By doing so, I aim to analyze and understand the
> dependencies between queries and the data they operate on. This information
> will provide us new insights in how data is used across our data platform.
>
> I have attempted to leverage the Spark SQL Antlr grammar, available at
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4,
> to parse the queries retrieved from the query history. Unfortunately, I
> have encountered difficulties when parsing more complex queries.
>
> As an example, I have struggled to parse queries with intricate constructs
> such as the following:
>
> SELECT
>   concat(pickup_zip, '-', dropoff_zip) as route,
>   AVG(fare_amount) as average_fare
> FROM
>   `samples`.`nyctaxi`.`trips`
> GROUP BY
>   1
> ORDER BY
>   2 DESC
> LIMIT 1000
>
> I would greatly appreciate it if you could provide me with some guidance
> on how to overcome these challenges. Specifically, I am interested in
> understanding if there are alternative approaches or existing tools that
> can help me achieve my goal of extracting the data objects used by queries
> from the Databricks query history.
>
> Additionally, if there are any resources, documentation, or examples that
> provide further clarity on this topic, I would be more than grateful to
> receive them. Any insights you can provide would be of immense help in
> advancing my understanding and enabling me to make the most of the Spark
> SQL and Databricks ecosystem.
>
> Thank you very much for your time and support. I eagerly look forward to
> hearing from you and benefiting from your expertise.
>
> Best regards,
> Ruben Mennes
>


Filtering JSON records when there isn't an exact schema match in Spark

2023-07-03 Thread Shashank Rao
Hi all,
I'm trying to read around 1,000,000 JSONL files present in S3 using Spark.
Once read, I need to write them to BigQuery.
I have a schema that may not be an exact match with all the records.
How can I filter records where there isn't an exact schema match:

Eg: if my records were:
{"x": 1, "y": 1}
{"x": 2, "y": 2}
{"x": 3, "y": 3}
{"x": 4, "y": "4"}
{"x": 5, "y": 5, "z": 5}

and if my schema were:
root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

I need the records 4 and 5 to be filtered out.
Record 4 should be filtered out since y is a string instead of long.
Record 5 should be filtered out since z is not part of the schema.

I tried applying my schema on read, but it does not work as needed:

StructType schema = new StructType().add("a", DataTypes.LongType).add("b",
DataTypes.LongType);
Dataset ds = spark.read().schema(schema).json("path/to/file")

This gives me a dataset that has record 4 with y=null and record 5 with x
and y.

Any help is appreciated.

-- 
Thanks,
Shashank Rao


CFP for the 2nd Performance Engineering track at Community over Code NA 2023

2023-07-03 Thread Brebner, Paul
Hi Apache Spark people - There are only 10 days left to submit a talk proposal 
(title and abstract only) for Community over Code NA 2023 - the 2nd Performance 
Engineering track is on this year so any Apache project-related performance and 
scalability talks are welcome, here's the CFP for more ideas and links 
including the CPF submission page:  
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fpulse%2Fcall-papers-2nd-performance-engineering-track-over-code-brebner%2F&data=05%7C01%7CPaul.Brebner%40netapp.com%7C0d1187d03bfc4f4feaa108db7b7b805f%7C4b0911a0929b4715944bc03745165b3a%7C0%7C0%7C638239542594411186%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=KLphTZD56cLkYNTRjsnPB0lkQ40kpEW1CB1wyVtutps%3D&reserved=0
 - Paul Brebner and Roger Abelenda


PySpark error java.lang.IllegalArgumentException

2023-07-03 Thread elango vaidyanathan
Hi all,

I am reading a parquet file like this and it gives
java.lang.IllegalArgumentException.
However i can work with other parquet files (such as nyc taxi parquet
files) without any issue. I have copied the full error log as well. Can you
please check once and let me know how to fix this?

import pyspark

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
"20g").config("spark.driver.memory", "50g").getOrCreate()

df=spark.read.parquet("/data/202301/account_cycle")

df.printSchema() # worksfine

df.count() #worksfine

df.show()# getting below error

>>> df.show()

23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:

23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:

23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
struct

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values in
memory (estimated size 540.6 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)

23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString
at NativeMethodAccessorImpl.java:0

23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 134217728 bytes, open cost is considered as scanning 4194304
bytes.

23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
NativeMethodAccessorImpl.java:0

23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
NativeMethodAccessorImpl.java:0) with 1 output partitions

23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
(showString at NativeMethodAccessorImpl.java:0)

23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()

23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()

23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
(MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
which has no missing parents

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values in
memory (estimated size 38.1 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)

23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast at
DAGScheduler.scala:1478

23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 14 (MapPartitionsRDD[42] at showString at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
Vector(0))

23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks
resource profile 0

23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID
48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes)
taskResourceAssignments Map()

23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)

23/07/03 18:07:20 INFO FileScanRDD: Reading File path:
file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range:
0-134217728, partition values: [empty row]

23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
48)

java.lang.IllegalArgumentException

at java.nio.Buffer.limit(Buffer.java:275)

at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)

at
org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)

at
org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)

at java.io.DataInputStream.readFully(DataInputStream.java:195)

at java.io.DataInputStream.readFully(DataInputStream.java:169)

at
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)

at
org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)

at
org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)

at
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary.(PlainValuesDictionary.java:154)

at
org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:96)

at
org.apache.parquet.column.Encoding$5.initDictionary(Encoding.java:163)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.(VectorizedColumnReader.java:114)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:352)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(Vectoriz