Re: use java in Grouped Map pandas udf to avoid serDe

2020-10-06 Thread Lian Jiang
why the perf improvement is small after enabling arrow? Anything else could be missing? Thanks. On Sun, Oct 4, 2020 at 10:36 AM Lian Jiang wrote: > Please ignore this question. > https://kontext.tech/column/spark/370/improve-pyspark-performance-using-pandas-udf-with-apache-arrow > shows p

Re: use java in Grouped Map pandas udf to avoid serDe

2020-10-04 Thread Lian Jiang
missed enabling spark.sql.execution.arrow.enabled. Thanks. Regards. On Sun, Oct 4, 2020 at 10:22 AM Lian Jiang wrote: > Hi, > > I am using pyspark Grouped Map pandas UDF ( > https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html). > Functionality wise it works

use java in Grouped Map pandas udf to avoid serDe

2020-10-04 Thread Lian Jiang
Hi, I am using pyspark Grouped Map pandas UDF ( https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html). Functionality wise it works great. However, serDe causes a lot of perf hits. To optimize this UDF, can I do either below: 1. use a java UDF to completely replace the python

Spark interrupts S3 request backoff

2020-04-12 Thread Lian Jiang
Hi, My Spark job failed when reading parquet files from S3 due to 503 slow down. According to https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html, I can use backoff to mitigate this issue. However, spark seems to interrupt the backoff sleeping (see "sleep interrupted"). Is

Re: pandas_udf is very slow

2020-04-05 Thread Lian Jiang
think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your > result the same? > > From: Lian Jiang > Date: Sunday, April 5, 2020 at 3:28 AM > To: user > Subject: pandas_udf is very slow > > Hi, > > I am using pandas udf in pyspark 2.4.3 on EMR 5.21.

pandas_udf is very slow

2020-04-05 Thread Lian Jiang
Hi, I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored over non pandas udf per https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf. My data has about 250M records and the pandas udf code is like: def

Re: pandas_udf throws "Unsupported class file major version 56"

2019-10-07 Thread Lian Jiang
I figured out. Thanks. On Mon, Oct 7, 2019 at 9:55 AM Lian Jiang wrote: > Hi, > > from pyspark.sql.functions import pandas_udf, PandasUDFType > import pyspark > from pyspark.sql import SparkSession > spark = SparkSession.builder.getOrCreate() > > df = spark.createDataFra

pandas_udf throws "Unsupported class file major version 56"

2019-10-07 Thread Lian Jiang
Hi, from pyspark.sql.functions import pandas_udf, PandasUDFType import pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame( [(1, True, 1.0, 'aa'), (1, False, 2.0, 'aa'), (2, True, 3.0, 'aa'), (2, True, 5.0, 'aa'), (2, True, 10.0,

Re: writing into oracle database is very slow

2019-04-19 Thread Lian Jiang
file and using >> oracle external table to do the insert. >> >> hope this could help. >> >> Dalin >> >>> On Thu, Apr 18, 2019 at 11:43 AM Jörn Franke wrote: >>> What is the size of the data? How much time does it need on HDFS and how >>

writing into oracle database is very slow

2019-04-06 Thread Lian Jiang
Hi, My spark job writes into oracle db using: df.coalesce(10).write.format("jdbc").option("url", url) .option("driver", driver).option("user", user) .option("batchsize", 2000) .option("password", password).option("dbtable", tableName).mode("append").save() It is much slow than writting

spark generates corrupted parquet files

2019-03-29 Thread Lian Jiang
Hi, Occasionally, spark generates some parquet files having only 4 bytes. The content is "PAR1". ETL spark jobs cannot handle such corrupted files and ignore the whole partition containing such poison pill files, causing big data loss. Spark also generates 0 bytes parquet files but they can be

Re: writing a small csv to HDFS is super slow

2019-03-26 Thread Lian Jiang
sible that you share your solution (in case the project is > open-sourced already) with us and then we can have a look at it? > > Many thanks in advance. > > Best regards, > [1]. https://github.com/databricks/spark-csv > > On Tue, Mar 26, 2019 at 1:09 AM Lian Jiang wrote:

Re: writing a small csv to HDFS is super slow

2019-03-25 Thread Lian Jiang
coalesce instead? > > Kathleen > > On Fri, Mar 22, 2019 at 2:43 PM Lian Jiang wrote: > >> Hi, >> >> Writing a csv to HDFS takes about 1 hour: >> >> >> df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header=

writing a small csv to HDFS is super slow

2019-03-22 Thread Lian Jiang
Hi, Writing a csv to HDFS takes about 1 hour: df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv) The generated csv file is only about 150kb. The job uses 3 containers (13 cores, 23g mem). Other people have similar issues but I don't

read json and write into parquet in executors

2019-03-11 Thread Lian Jiang
Hi, In my spark batch job, step 1: the driver assigns a partition of json file path list to each executor. step 2: each executor gets these assigned json files from S3 and save into hdfs. step 3: the driver read these json files into a data frame and save into parquet. To improve performance by

Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Lian Jiang
management by setting the following >>> configuration in the SparkSession before starting the streaming query. >>> >>> spark.conf.set( >>> "spark.sql.streaming.stateStore.providerClass", >>> "com.databricks.sql.streaming.state.RocksDBStat

use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Lian Jiang
Hi, I have a very simple SSS pipeline which does: val query = df .dropDuplicates(Array("Id", "receivedAt")) .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt"))) .writeStream .format("parquet") .partitionBy("availabilityDomain", timePartitionCol)

Re: spark structured streaming crash due to decompressing gzip file failure

2019-03-07 Thread Lian Jiang
e > > For me this worked to ignore reading empty/partially uploaded gzip files > in s3 bucket. > > Akshay Bhardwaj > +91-97111-33849 > > > On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang wrote: > >> Hi, >> >> I have a structured streaming job which li

spark structured streaming crash due to decompressing gzip file failure

2019-03-06 Thread Lian Jiang
Hi, I have a structured streaming job which listens to a hdfs folder containing jsonl.gz files. The job crashed due to error: java.io.IOException: incorrect header check at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method) at

spark structured streaming handles pre-existing files

2019-02-14 Thread Lian Jiang
Hi, We have a spark structured streaming monitoring a folder and converting jsonl files into parquet. However, if there are some pre-existing jsonl files before the first time (no check point yet) running of the spark streaming job, these files will not be processed by the spark job when it runs.

structured streaming handling validation and json flattening

2019-02-09 Thread Lian Jiang
Hi, We have a structured streaming job that converting json into parquets. We want to validate the json records. If a json record is not valid, we want to log a message and refuse to write it into the parquet. Also the json has nesting jsons and we want to flatten the nesting jsons into other

spark in jupyter cannot find a class in a jar

2018-11-15 Thread Lian Jiang
I am using spark in Jupyter as below: import findspark findspark.init() from pyspark import SQLContext, SparkContext sqlCtx = SQLContext(sc) df = sqlCtx.read.parquet("oci://mybucket@mytenant/myfile.parquet") The error is: Py4JJavaError: An error occurred while calling o198.parquet. :

Re: Spark Structured Streaming handles compressed files

2018-11-02 Thread Lian Jiang
Any clue? Thanks. On Wed, Oct 31, 2018 at 8:29 PM Lian Jiang wrote: > We have jsonl files each of which is compressed as gz file. Is it possible > to make SSS to handle such files? Appreciate any help! >

Spark Structured Streaming handles compressed files

2018-10-31 Thread Lian Jiang
We have jsonl files each of which is compressed as gz file. Is it possible to make SSS to handle such files? Appreciate any help!

Re: spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
I solved this issue by using spark 2.3.1 jars copied from the HDP3.0 cluster. Thanks. On Thu, Aug 30, 2018 at 10:18 AM Lian Jiang wrote: > Per https://spark.apache.org/docs/latest/building-spark.html, spark 2.3.1 > is built with hadoop 2.6.X by default. This is why I see my fat jar >

Re: spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
2.3.1 built with hadoop 2.7. Where can I get spark 2.3.1 built with hadoop 3? Does spark 2.3.1 support hadoop 3? Appreciate your help. On Thu, Aug 30, 2018 at 8:59 AM Lian Jiang wrote: > Hi, > > I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark > streaming jobs

spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
Hi, I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark streaming jobs running fine in HDP2.6.4 (HADOOP2.7.3, spark 2.2.0) fails in HDP3: java.lang.IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface

load hbase data using spark

2018-06-18 Thread Lian Jiang
Hi, I am considering tools to load hbase data using spark. One choice is https://github.com/Huawei-Spark/Spark-SQL-on-HBase. However, this seems to be out-of-date (e.g. "This version of 1.0.0 requires Spark 1.4.0."). Which tool should I use for this purpose? Thanks for any hint.

Re: schema change for structured spark streaming using jsonl files

2018-04-24 Thread Lian Jiang
Thanks for any help! On Mon, Apr 23, 2018 at 11:46 AM, Lian Jiang <jiangok2...@gmail.com> wrote: > Hi, > > I am using structured spark streaming which reads jsonl files and writes > into parquet files. I am wondering what's the process if jsonl files schema > change. &g

schema change for structured spark streaming using jsonl files

2018-04-23 Thread Lian Jiang
Hi, I am using structured spark streaming which reads jsonl files and writes into parquet files. I am wondering what's the process if jsonl files schema change. Suppose jsonl files are generated in \jsonl folder and the old schema is { "field1": String}. My proposal is: 1. write the jsonl files

spark hbase connector

2018-04-17 Thread Lian Jiang
Hi, My spark jobs need to talk to hbase and I am not sure which spark hbase connector is recommended: https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/ https://phoenix.apache.org/phoenix_spark.html Or there is any other better solutions. Appreciate any guidance.

avoid duplicate records when appending new data to a parquet

2018-04-13 Thread Lian Jiang
I have a parquet which has an id field which is the hash of the composite key fields. Is it possible to maintain the uniqueness of the id field when appending new data which may duplicate with existing records in the parquet? Thanks!

Re: retention policy for spark structured streaming dataset

2018-03-14 Thread Lian Jiang
( by day ) ? That will make it easier to drop > data older than x days outside streaming job. > > Sunil Parmar > > On Wed, Mar 14, 2018 at 11:36 AM, Lian Jiang <jiangok2...@gmail.com> > wrote: > >> I have a spark structured streaming job which dump data into a parquet >>

retention policy for spark structured streaming dataset

2018-03-14 Thread Lian Jiang
I have a spark structured streaming job which dump data into a parquet file. To avoid the parquet file grows infinitely, I want to discard 3 month old data. Does spark streaming supports this? Or I need to stop the streaming job, trim the parquet file and restart the streaming job? Thanks for any

Re: dependencies conflict in oozie spark action for spark 2

2018-03-07 Thread Lian Jiang
ster$.main(ApplicationMaster.scala:766) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Any idea? On Tue, Mar 6, 2018 at 4:17 PM, Lian Jiang <jiangok2...@gmail.com> wrote: > I am using HDP 2.6.4 and have followed https://docs.hortonworks.com/ >

dependencies conflict in oozie spark action for spark 2

2018-03-06 Thread Lian Jiang
I am using HDP 2.6.4 and have followed https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_spark-component-guide/content/ch_oozie-spark-action.html to make oozie use spark2. After this, I found there are still a bunch of issues: 1. oozie and spark tries to add the same jars multiple time

Re: Can spark handle this scenario?

2018-02-26 Thread Lian Jiang
Thanks Vijay. After changing the programming model (create a context class for the workers), it finally worked for me. Cheers. On Fri, Feb 23, 2018 at 5:42 PM, vijay.bvp wrote: > when HTTP connection is opened you are opening a connection between > specific > machine (with

Re: Can spark handle this scenario?

2018-02-22 Thread Lian Jiang
Hi Vijay, Should HTTPConnection() (or any other object created per partition) be serializable so that your code work? If so, the usage seems to be limited. Sometimes, the error caused by a non-serializable object can be very misleading (e.g. "Return statements aren't allowed in Spark closures")

Re: Return statements aren't allowed in Spark closures

2018-02-21 Thread Lian Jiang
> > Get Outlook for Android <https://aka.ms/ghei36> > > -- > *From:* Lian Jiang <jiangok2...@gmail.com> > *Sent:* Wednesday, February 21, 2018 4:16:08 PM > *To:* user > *Subject:* Return statements aren't allowed in Spark closures > &

Return statements aren't allowed in Spark closures

2018-02-21 Thread Lian Jiang
I can run below code in spark-shell using yarn client mode. val csv = spark.read.option("header", "true").csv("my.csv") def queryYahoo(row: Row) : Int = { return 10; } csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => { queryYahoo(r) })} However, the same code failed when run using

Re: Can spark handle this scenario?

2018-02-20 Thread Lian Jiang
Thanks Vijay! This is very clear. On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp wrote: > I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API > with some token passed, in the code provided so far if you have 2000 > symbols, it will make 2000 new

Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
> > Best regards, > Anastasios > > > > > On Sat, Feb 17, 2018 at 6:13 PM, Lian Jiang <jiangok2...@gmail.com> wrote: > >> *Snehasish,* >> >> I got this in spark-shell 2.11.8: >> >> case class My(name:Strin

Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
ng. So download it first and store it on a > distributed file system. Schedule to download newest information every day/ > hour etc. you can store it using a query optimized format such as ORC or > Parquet. Then you can run queries over it. > > On 17. Feb 2018, at 01:10, Lian Jiang <jia

Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
7, 2018 at 1:05 PM, Holden Karau <holden.ka...@gmail.com> > wrote: > >> I'm not sure what you mean by it could be hard to serialize complex >> operations? >> >> Regardless I think the question is do you want to parallelize this on >> multiple machines or just

Re: Can spark handle this scenario?

2018-02-16 Thread Lian Jiang
t, Feb 17, 2018 at 12:40 PM, Irving Duran <irving.du...@gmail.com> >> wrote: >> >>> Do you only want to use Scala? Because otherwise, I think with pyspark >>> and pandas read table you should be able to accomplish what you want to >>> accomplish. >>&

Can spark handle this scenario?

2018-02-16 Thread Lian Jiang
Hi, I have a user case: I want to download S stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol: case class Symbol(symbol: String, sector: String) case class Tick(symbol: String, sector:

Re: saveAsTable does not respect spark.sql.warehouse.dir

2018-02-11 Thread Lian Jiang
s default database of hive. You might not have > access to it that's causing problems. > > > > Thanks > Prashanth Thipparthi > > > > > > On 11 Feb 2018 10:45 pm, "Lian Jiang" <jiangok2...@gmail.com> wrote: > > I started spark-shell with

Re: Spark cannot find tables in Oracle database

2018-02-11 Thread Lian Jiang
table names. I remember once that the >> table names had to be in small letters, but that was in MYSQL. >> >> >> Regards, >> Gourav >> >> On Sun, Feb 11, 2018 at 2:26 AM, Lian Jiang <jiangok2...@gmail.com> >> wrote: >> >>> Hi, >&

Spark cannot find tables in Oracle database

2018-02-10 Thread Lian Jiang
Hi, I am following https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases to query oracle database 12.1 from spark shell 2.11.8. val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST =