Re: Apache Arrow support for Apache Spark

2020-02-17 Thread Chris Teoh
1. I'd also consider how you're structuring the data before applying the join, naively doing the join could be expensive so doing a bit of data preparation may be necessary to improve join performance. Try to get a baseline as well. Arrow would help improve it. 2. Try storing it back as Parquet

Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Chris Teoh
ka message is immutable, so a message will get sent with > a different offset instead of the same offset. > > So spark when reading from kafka is acting as a least once consumer? Why > does spark not do checkpointing for batch read of kafka? > > On Mon, Feb 3, 2020 at 1:36 AM

Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Chris Teoh
Kafka can keep track of the offsets (in a separate topic based on your consumer group) you've seen but it is usually best effort and you're probably better off also keeping track of your offsets. If the producer resends a message you would have to dedupe it as you've most likely already seen it,

Re: Submitting job with external dependencies to pyspark

2020-01-28 Thread Chris Teoh
lly helpful. Thanks! I actually solved my problem using by > creating a venv and using the venv flags. Wondering now how to submit the > data as an archive? Any idea? > > On Mon, Jan 27, 2020, 9:25 PM Chris Teoh wrote: > >> Use --py-files >> >> See >> https

Re: Submitting job with external dependencies to pyspark

2020-01-27 Thread Chris Teoh
Use --py-files See https://spark.apache.org/docs/latest/submitting-applications.html#bundling-your-applications-dependencies I hope that helps. On Tue, 28 Jan 2020, 9:46 am Tharindu Mathew, wrote: > Hi, > > Newbie to pyspark/spark here. > > I'm trying to submit a job to pyspark with a

Re: RESTful Operations

2020-01-19 Thread Chris Teoh
Maybe something like Livy, otherwise roll your own REST API and have it start a Spark job. On Mon, 20 Jan 2020 at 06:55, wrote: > I am new to Spark. The task I want to accomplish is let client send http > requests, then spark process that request for further operations. However > searching

Re: Does explode lead to more usage of memory

2020-01-19 Thread Chris Teoh
Depends on the use case, if you have to join, you're saving a join and a shuffle from having it already in an array. If you explode, at least sort within partitions to get you predicate pushdown when you read the data next time. On Sun, 19 Jan 2020, 1:19 pm Jörn Franke, wrote: > Why not two

Re: Does explode lead to more usage of memory

2020-01-18 Thread Chris Teoh
I think it does mean more memory usage but consider how big your arrays are. Think about your use case requirements and whether it makes sense to use arrays. Also it may be preferable to explode if the arrays are very large. I'd say exploding arrays will make the data more splittable, having the

Re: Spark Executor OOMs when writing Parquet

2020-01-17 Thread Chris Teoh
15000 partitions still failed. I am trying 3 partitions now. There is > still some disk spill but it is not that high. > > Thanks, > > Arwin > > ------ > *From:* Chris Teoh > *Sent:* January 17, 2020 7:32 PM > *To:* Arwin Tio > *Cc:* user @spar

Re: Out of memory HDFS Read and Write

2019-12-22 Thread Chris Teoh
tioning output to more > partitions, 40 precisely (when it failed, we partitioned the output to 20 > after logic is finished but before writing to HDFS) have made the > read stage succeed. > > Not understanding how spark read stage can experience OOM issues. > Hoping to

Re: Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Chris Teoh
stage, when spark reads the 20 >> files each 132M, it comes out to be 40 partitions. >> >> >> >> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh wrote: >> >>> If you're using Spark SQL, that configuration setting causes a shuffle >>> if the number of yo

Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
> We keep hdfs block 128Mb > > On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh wrote: > >> spark.sql.shuffle.partitions might be a start. >> >> Is there a difference in the number of partitions when the parquet is >> read to spark.sql.shuffle.partitions? Is it much hi

Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
spark.sql.shuffle.partitions might be a start. Is there a difference in the number of partitions when the parquet is read to spark.sql.shuffle.partitions? Is it much higher than spark.sql.shuffle.partitions? On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, wrote: > Hi all, > > I have encountered a

Re: Identify bottleneck

2019-12-19 Thread Chris Teoh
As far as I'm aware it isn't any better. The logic all gets processed by the same engine so to confirm, compare the DAGs generated from both approaches and see if they're identical. On Fri, 20 Dec 2019, 8:56 am ayan guha, wrote: > Quick question: Why is it better to use one sql vs multiple

Re: Identify bottleneck

2019-12-18 Thread Chris Teoh
Please look at the spark UI and confirm you are indeed getting more than 1 partition in your dataframe. Text files are usually not splittable so you may just be doing all the work in a single partition. If that is the case, It may be worthwhile considering calling the repartition method to

Re: Request more yarn vcores than executors

2019-12-08 Thread Chris Teoh
ake on more simultaneous tasks per executor > > > That is exactly what I want to avoid. that nature of the task makes it > difficult to parallelise over many partitions. Ideally i'd have 1 executor > per task with 10+ cores assigned to each executor > > On Sun, 8 Dec 2019 at

Re: Request more yarn vcores than executors

2019-12-08 Thread Chris Teoh
I thought --executor-cores is the same the other argument. If anything, just set --executor-cores to something greater than 1 and don't set the other one you mentioned. You'll then get greater number of cores per executor so you can take on more simultaneous tasks per executor. On Sun, 8 Dec

Re: OOM Error

2019-09-07 Thread Chris Teoh
gt; > On Sat, Sep 7, 2019 at 2:56 PM Chris Teoh wrote: > >> You can try, consider processing each partition separately if your data >> is heavily skewed when you partition it. >> >> On Sat, 7 Sep 2019, 7:19 pm Ankit Khettry, >> wrote: >> >>>

Re: OOM Error

2019-09-07 Thread Chris Teoh
if it would help if I repartition the data by > the fields I am using in group by and window operations? > > Best Regards > Ankit Khettry > > On Sat, 7 Sep, 2019, 1:05 PM Chris Teoh, wrote: > >> Hi Ankit, >> >> Without looking at the Spark UI and the stages/DA

Re: OOM Error

2019-09-07 Thread Chris Teoh
Hi Ankit, Without looking at the Spark UI and the stages/DAG, I'm guessing you're running on default number of Spark shuffle partitions. If you're seeing a lot of shuffle spill, you likely have to increase the number of shuffle partitions to accommodate the huge shuffle size. I hope that helps

Re: Control Sqoop job from Spark job

2019-09-02 Thread Chris Teoh
based on dataframe values in spark job. Certainly it can be > isolated and broken. > > On Sat, Aug 31, 2019 at 8:07 AM Chris Teoh wrote: > >> I'd say this is an uncommon approach, could you use a workflow/scheduling >> system to call Sqoop outside of Spark? Spark is usually multipro

Re: Control Sqoop job from Spark job

2019-08-30 Thread Chris Teoh
I'd say this is an uncommon approach, could you use a workflow/scheduling system to call Sqoop outside of Spark? Spark is usually multiprocess distributed so putting in this Sqoop job in the Spark code seems to imply you want to run Sqoop first, then Spark. If you're really insistent on this, call

Re: [pyspark 2.4.3] small input csv ~3.4GB gets 40K tasks created

2019-08-30 Thread Chris Teoh
Look at your DAG. Are there lots of CSV files? Does your input CSV dataframe have lots of partitions to start with? Bear in mind cross join makes the dataset much larger so expect to have more tasks. On Fri, 30 Aug 2019 at 14:11, Rishi Shah wrote: > Hi All, > > I am scratching my head against

Re: Reading configuration file in Spark Scala throws error

2019-08-03 Thread Chris Teoh
This seems to work- val printEntry = new java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]] { override def accept(a: java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = { println(a.getKey) } }

Re: Spark 2.3 Dataframe Grouby operation throws IllegalArgumentException on Large dataset

2019-07-24 Thread Chris Teoh
This might be a hint. Maybe invalid data? Caused by: java.lang.IllegalArgumentException: Missing required char ':' at 'struct>' On Wed., 24 Jul. 2019, 2:15 pm Balakumar iyer S, wrote: > Hi Bobby Evans, > > I apologise for the delayed response , yes you are right I missed out to > paste the

Re: Attempting to avoid a shuffle on join

2019-07-06 Thread Chris Teoh
Dataframes have a partitionBy function too. You can avoid a shuffle if one of your datasets is small enough to broadcast. On Thu., 4 Jul. 2019, 7:34 am Mkal, wrote: > Please keep in mind i'm fairly new to spark. > I have some spark code where i load two textfiles as datasets and after > some >

Re: Learning Spark

2019-07-05 Thread Chris Teoh
Scala is better suited to data engineering work. It also has better integration with other components like HBase, Kafka, etc. Python is great for data scientists as there are more data science libraries available in Python. On Fri., 5 Jul. 2019, 7:40 pm Vikas Garg, wrote: > Is there any

Re: Map side join without broadcast

2019-07-01 Thread Chris Teoh
so i end up with > one index in each partition. > > Then i read the words again but this time assign every partition to each > word and join it on the indices rdd by partition key. So effectively every > index will be queries > > Finally i merge the results from each index i

Re: Implementing Upsert logic Through Streaming

2019-07-01 Thread Chris Teoh
00 , 1234 > 2, 2000, 2234 > 3, 2000,3234 > 2, 2100,4234 > > Thanks > Sachit > > On Mon, 1 Jul 2019, 01:46 Chris Teoh, wrote: > >> Just thinking on this, if your needs can be addressed using batch instead >> of streaming, I think this is a viable solution. Using

Re: Implementing Upsert logic Through Streaming

2019-06-30 Thread Chris Teoh
Just thinking on this, if your needs can be addressed using batch instead of streaming, I think this is a viable solution. Using a lambda architecture approach seems like a possible solution. On Sun., 30 Jun. 2019, 9:54 am Chris Teoh, wrote: > Not sure what your needs are here. > > I

Re: Implementing Upsert logic Through Streaming

2019-06-29 Thread Chris Teoh
Not sure what your needs are here. If you can afford to wait, increase your micro batch windows to a long period of time, aggregate your data by key every micro batch and then apply those changes to the Oracle database. Since you're using text file to stream, there's no way to pre partition your

Re: Map side join without broadcast

2019-06-29 Thread Chris Teoh
The closest thing I can think of here is if you have both dataframes written out using buckets. Hive uses this technique for join optimisation such that both datasets of the same bucket are read by the same mapper to achieve map side joins. On Sat., 29 Jun. 2019, 9:10 pm jelmer, wrote: > I have

Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Chris Teoh
table. > > Is there any way to control/override this? > > Thanks, > Peter > > > On Wed, Feb 13, 2019, 13:09 Chris Teoh >> Hey there, >> >> Could you not just create a managed table using the DDL in Spark SQL and >> then written the data frame to the underl

Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Chris Teoh
Hey there, Could you not just create a managed table using the DDL in Spark SQL and then written the data frame to the underlying folder or use Spark SQL to do an insert? Alternatively try create table as select. Iirc hive creates managed tables this way. I've not confirmed this works but I

Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-01 Thread Chris Teoh
[2] at parallelize at :28 scala> rddIt.flatMap(_.toList) res4: org.apache.spark.rdd.RDD[MyClass] = MapPartitionsRDD[3] at flatMap at :26 res4 is what you're looking for. On Sat, 1 Dec 2018 at 21:09, Chris Teoh wrote: > Do you have the full code example? > > I think this would b

Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-01 Thread Chris Teoh
Do you have the full code example? I think this would be similar to the mapPartitions code flow, something like flatMap( _ => _.toList ) I haven't yet tested this out but this is how I'd first try. On Sat, 1 Dec 2018 at 01:02, James Starks wrote: > When processing data, I create an instance

Re: No auto decompress in Spark Java textFile function?

2015-09-09 Thread Chris Teoh
with .gz files, i haven't tested it on bz2 files. If > it isn't decompressing by default then what you have to do is to use the > sc.wholeTextFiles and then decompress each record (that being file) with > the corresponding codec. > > Thanks > Best Regards > > On Tue, S

No auto decompress in Spark Java textFile function?

2015-09-08 Thread Chris Teoh
Hi Folks, I tried using Spark v1.2 on bz2 files in Java but the behaviour is different to the same textFile API call in Python and Scala. That being said, how do I process to read .tar.bz2 files in Spark's Java API? Thanks in advance Chris