Writing to mysql from pyspark spark structured streaming

2020-10-15 Thread Krishnanand Khambadkone
Hi,  I am trying to write to mysql from a spark structured streaming kafka source.  Using spark 2.4.0. I get this exception, AnalysisException: u"'write' can not be called on streaming Dataset/DataFrame

Re: ForeachBatch Structured Streaming

2020-10-15 Thread muru
To achieve exactly-once with foreachBatch in SS, you must have a checkpoint enabled. In case of any exceptions or failures the spark SS job will get restarted and the same batchID reprocessed again (for any data sources). To avoid duplicates, you should have an external system to store and dedupe

Re: How to Scale Streaming Application to Multiple Workers

2020-10-15 Thread muru
File streaming in SS, you can try setting "maxFilesPerTrigger" per batch. The forEachBatch is an action, the output is written to various sinks. Are you doing any post transformation in forEachBatch? On Thu, Oct 15, 2020 at 1:24 PM Mich Talebzadeh wrote: > Hi, > > This in general depends on how

Re: Scala vs Python for ETL with Spark

2020-10-15 Thread Mich Talebzadeh
Hi, I spent a few days converting one of my Spark/Scala scripts to Python. It was interesting but at times looked like trench war. There is a lot of handy stuff in Scala like case classes for defining column headers etc that don't seem to be available in Python (possibly my lack of in-depth

Re: How to Scale Streaming Application to Multiple Workers

2020-10-15 Thread Mich Talebzadeh
Hi, This in general depends on how many topics you want to process at the same time and whether this is done on-premise running Spark in cluster mode. Have you looked at Spark GUI to see if one worker (one JVM) is adequate for the task? Also how these small files are read and processed. Is it

Re: How to Scale Streaming Application to Multiple Workers

2020-10-15 Thread Artemis User
Thanks for the input.  What I am interested is how to have multiple workers to read and process the small files in parallel, and certainly one file per worker at a time.  Partitioning data frame doesn't make sense since the data frame is small already. On 10/15/20 9:14 AM, Lalwani, Jayesh

Re: The equivalent of Scala mapping in Pyspark

2020-10-15 Thread Mich Talebzadeh
Hi all, I managed to sort this one out in a trench way as the Pyspark available materials are not as comprehensive as Scala one. Frankly to sort this out was a bit of a struggle for me. However, I managed to make it work. What the script does in a nutshell is to generate rows in Spark, and save

Re: Write pyspark dataframe into kms encrypted s3 bucket

2020-10-15 Thread Hariharan
Sorry, I meant setLong only. If you know which version of hadoop jars you're using, you can check the code here to try to find out which line exactly is throwing the

Re: Write pyspark dataframe into kms encrypted s3 bucket

2020-10-15 Thread Devi P V
hadoop_conf.set("fs.s3a.multipart.size", 104857600L) .set only allows string values. Its throwing invalid syntax. I tried following also. But issue not fixed. hadoop_conf.setLong("fs.s3a.multipart.size", 104857600) Thanks On Thu, Oct 15, 2020, 7:22 PM Hariharan wrote: >

Re: Write pyspark dataframe into kms encrypted s3 bucket

2020-10-15 Thread Hariharan
fs.s3a.multipart.size needs to be a long value, not a string, so you will need to use hadoop_conf.set("fs.s3a.multipart.size", 104857600L) ~ Hariharan On Thu, Oct 15, 2020 at 6:32 PM Devi P V wrote: > > Hi All, > > I am trying to write a pyspark dataframe into KMS encrypted S3 bucket.I am >

Re: How to Scale Streaming Application to Multiple Workers

2020-10-15 Thread Lalwani, Jayesh
Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism. On 10/14/20, 11:26 PM, "Artemis User" wrote: CAUTION:

Write pyspark dataframe into kms encrypted s3 bucket

2020-10-15 Thread Devi P V
Hi All, I am trying to write a pyspark dataframe into KMS encrypted S3 bucket.I am using spark-3.0.1-bin-hadoop3.2. I have given all the possible configurations as shown below. sc = spark.sparkContext hadoop_conf = sc._jsc.hadoopConfiguration() hadoop_conf.set("fs.s3a.access.key", "XXX")