Re: External Spark shuffle service for k8s

2024-04-07 Thread Enrico Minack
There is Apache incubator project Uniffle: https://github.com/apache/incubator-uniffle It stores shuffle data on remote servers in memory, on local disk and HDFS. Cheers, Enrico Am 06.04.24 um 15:41 schrieb Mich Talebzadeh: I have seen some older references for shuffle service for k8s,

Re: AQE coalesce 60G shuffle data into a single partition

2024-02-24 Thread Enrico Minack
Hi Shay, maybe this is related to the small number of output rows (1,250) of the last exchange step that consume those 60GB shuffle data. Looks like your outer transformation is something like df.groupBy($"id").agg(collect_list($"prop_name")) Have you tried adding a repartition as an attempt

Re: ordering of rows in dataframe

2023-12-05 Thread Enrico Minack
Looks like what you want is to add a column that, when ordered by that column, the current order of the dateframe is preserved. All you need is the monotonically_increasing_id() function: spark.range(0, 10, 1, 5).withColumn("row", monotonically_increasing_id()).show() +---+---+ | id|

Re: [PySpark][Spark Dataframe][Observation] Why empty dataframe join doesn't let you get metrics from observation?

2023-12-05 Thread Enrico Minack
e the join also gets optimized away, but table df is still filtered for col1 = 'c', which iterates over the rows and collects the metrics for observation 1. Hope this helps to understand why there are no observed metrics for Observation("1") in your case. Enrico Am 04.12.23 um 10:45 schr

Re: [PySpark][Spark Dataframe][Observation] Why empty dataframe join doesn't let you get metrics from observation?

2023-12-04 Thread Enrico Minack
Hi Michail, observations as well as ordinary accumulators only observe / process rows that are iterated / consumed by downstream stages. If the query plan decides to skip one side of the join, that one will be removed from the final plan completely. Then, the Observation will not retrieve any

Re: Apache Spark not reading UTC timestamp from MongoDB correctly

2023-06-08 Thread Enrico Minack
Sean is right, casting timestamps to strings (which is what show() does) uses the local timezone, either the Java default zone `user.timezone`, the Spark default zone `spark.sql.session.timeZone` or the default DataFrameWriter zone `timeZone`(when writing to file). You say you are in PST,

Re: Incremental Value dependents on another column of Data frame Spark

2023-05-24 Thread Enrico Minack
Hi, given your dataset: val df=Seq( (1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523, "M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12,

Re: Write custom JSON from DataFrame in PySpark

2023-05-04 Thread Enrico Minack
Hi, You could rearrange the DataFrame so that writing the DataFrame as-is produces your structure: df = spark.createDataFrame([(1, "a1"), (2, "a2"), (3, "a3")], "id int, datA string") +---++ | id|datA| +---++ |  1|  a1| |  2|  a2| |  3|  a3| +---++ df2 = df.select(df.id,

Re: Use Spark Aggregator in PySpark

2023-04-24 Thread Enrico Minack
Hi, For an aggregating UDF, use spark.udf.registerJavaUDAF(name, className). Enrico Am 23.04.23 um 23:42 schrieb Thomas Wang: Hi Spark Community, I have implemented a custom Spark Aggregator (a subclass to |org.apache.spark.sql.expressions.Aggregator|). Now I'm trying to use it in a

Re: How to explode array columns of a dataframe having the same length

2023-02-16 Thread Enrico Minack
You have to take each row and zip the lists, each element of the result becomes one new row. So turn write a method that turns   Row(List("A","B","null"), List("C","D","null"), List("E","null","null")) into   List(List("A","C","E"), List("B","D","null"), List("null","null","null")) and use

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Enrico Minack
oop in parallel "? btw this didn't work: for (String columnName : df.columns()) {     df= df.withColumn(columnName, collect_set(col(columnName)).as(columnName)); } Le dim. 12 févr. 2023 à 20:36, Enrico Minack a écrit : That is unfortunate, but 3.4.0 is around

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Enrico Minack
ave a single DataFrame that computes all columns in a single Spark job. But this reads all distinct values into a single partition, which has the same downside as collect, so this is as bad as using collect. Cheers, Enrico Am 12.02.23 um 18:05 schrieb sam smith: @Enrico Minack <mailto:enri

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-11 Thread Enrico Minack
You could do the entire thing in DataFrame world and write the result to disk. All you need is unpivot (to be released in Spark 3.4.0, soon). Note this is Scala but should be straightforward to translate into Java: import org.apache.spark.sql.functions.collect_set val df = Seq((1, 10, 123),

SQL GROUP BY alias with dots, was: Spark SQL question

2023-02-07 Thread Enrico Minack
Hi, you are right, that is an interesting question. Looks like GROUP BY is doing something funny / magic here (spark-shell 3.3.1 and 3.5.0-SNAPSHOT): With an alias, it behaves as you have pointed out: spark.range(3).createTempView("ids_without_dots") spark.sql("SELECT * FROM

Re: The Dataset unit test is much slower than the RDD unit test (in Scala)

2022-11-01 Thread Enrico Minack
Hi Tanin, running your test with option "spark.sql.planChangeLog.level" set to "info" or "warn" (depending on your Spark log level) will show you insights into the planning (which rules are applied, how long rules take, how many iterations are done). Hoping this helps, Enrico Am 25.10.22

Re: Reading too many files

2022-10-05 Thread Enrico Minack
Hi, Spark is fine with that many Parquet files in general: # generate 100,000 small Parquet files spark.range(0, 100, 1, 10).write.parquet("too-many-files.parquet") # read 100,000 Parquet files val df = spark.read.parquet("too-many-files.parquet") df.show() df.count() Reading the

Re: [Spark Internals]: Is sort order preserved after partitioned write?

2022-09-17 Thread Enrico Minack
0796737203| +---+---+-+---+ Thanks, Swetha On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack wrote: Yes, you can expect each partition file to be sorted by "col1" and "col2". However, values

Re: Splittable or not?

2022-09-17 Thread Enrico Minack
If with "won't affect the performance" you mean "parquet is splittable though it uses snappy", then yes. Splittable files allow for optimal parallelization, which "won't affect performance". Spark writing data will split the data into multiple files already (here parquet files). Even if each

Re: [Spark Internals]: Is sort order preserved after partitioned write?

2022-09-15 Thread Enrico Minack
Yes, you can expect each partition file to be sorted by "col1" and "col2". However, values for "col1" will be "randomly" allocated to partition files, but all rows with the same value for "col1" will reside in the same one partition file. What kind of unexpected sort order do you observe?

Re: reading each JSON file from dataframe...

2022-07-12 Thread Enrico Minack
and the JAR library uses different dependencies. Hope this findings helps others as well. Thanks, Muthu On Mon, 11 Jul 2022 at 14:11, Enrico Minack wrote: All you need to do is implement a method readJson that reads a single file given its path. Than, you map the values of column

Re: reading each JSON file from dataframe...

2022-07-11 Thread Enrico Minack
All you need to do is implement a method readJson that reads a single file given its path. Than, you map the values of column file_path to the respective JSON content as a string. This can be done via an UDF or simply Dataset.map: case class RowWithJsonUri(entity_id: String, file_path:

Re: Will it lead to OOM error?

2022-06-22 Thread Enrico Minack
lead to OOM error? Thanks, Sid On Wed, Jun 22, 2022 at 6:40 PM Enrico Minack wrote: The RAM and disk memory consumtion depends on what you do with the data after reading them. Your particular action will read 20 lines from the first partition and show them. So it will not use

Re: Will it lead to OOM error?

2022-06-22 Thread Enrico Minack
The RAM and disk memory consumtion depends on what you do with the data after reading them. Your particular action will read 20 lines from the first partition and show them. So it will not use any RAM or disk, no matter how large the CSV is. If you do a count instead of show, it will

Re: input file size

2022-06-19 Thread Enrico Minack
Maybe a   .as[String].mapPartitions(it => if (it.hasNext) Iterator(it.next) else Iterator.empty) might be faster than the   .distinct.as[String] Enrico Am 19.06.22 um 08:59 schrieb Enrico Minack: Given you already know your input files (input_file_name), why not getting their s

Re: input file size

2022-06-19 Thread Enrico Minack
Given you already know your input files (input_file_name), why not getting their size and summing this up? |import java.io.File ||import java.net.URI| |import| org.apache.spark.sql.functions.input_file_name |ds.select(input_file_name.as("filename")) .distinct.as[String] .map(filename => new

Re: API Problem

2022-06-13 Thread Enrico Minack
)                 finalDFStatus = finalDF.withColumn("edl_timestamp", to_timestamp(lit(F.TimeNow(.withColumn(                     "status_for_each_batch",                     lit(str(response)))                 print("Max Value:::")                 print(maxValue)

Re: API Problem

2022-06-10 Thread Enrico Minack
I am expecting the payload to be as a JSON string to be a record like below: {"A":"some_value","B":"some_value"} Where A and B are the columns in my dataset. On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack wrote: Sid, just recognized you are

Re: API Problem

2022-06-10 Thread Enrico Minack
use of a column expression. What do you expect |print(payload)| to be? I recommend to split that complex command into multiple commands to find out what "an error of column not iterable" refers to. Enrico Am 10.06.22 um 13:39 schrieb Enrico Minack: Hi Sid, ||finalDF = finalDF.r

Re: API Problem

2022-06-10 Thread Enrico Minack
Hi Sid, ||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions()) .withColumn("status_for_batch", call_to_cust_bulk_api(policyUrl, to_json(struct(*colsListToBePassed | | You are calling ||withColumn|| with the result of ||call_to_cust_bulk_api|| as the second argument. That result

Re: partitionBy creating lot of small files

2022-06-04 Thread Enrico Minack
You refer to df.write.partitionBy, which creates for each value of "col" a directory, and in worst-case writes one file per DataFrame partition. So the number of output files is controlled by cardinality of "col", which is your data and hence out of control, and the number of partitions of

Re: How to convert a Dataset to a Dataset?

2022-06-04 Thread Enrico Minack
); will yield StringType as a type for column c1 similarly for c6 I want to return the true type of each column by first discarding the "+" I use Dataset after filtering the rows (removing "+") because i can re-read the new dataset using .csv() method. Any better idea to do th

Re: How to convert a Dataset to a Dataset?

2022-06-04 Thread Enrico Minack
Can you provide an example string (row) and the expected inferred schema? Enrico Am 04.06.22 um 18:36 schrieb marc nicole: How to do just that? i thought we only can inferSchema when we first read the dataset, or am i wrong? Le sam. 4 juin 2022 à 18:10, Sean Owen a écrit : It sounds

Re: PartitionBy and SortWithinPartitions

2022-06-03 Thread Enrico Minack
Nikhil, What are you trying to achieve with this in the first place? What are your goals? What is the problem with your approach? Are you concerned about the 1000 files in each written col2-partition? The write.partitionBy is something different that df.repartition or df.coalesce. The df

Re: Writing Custom Spark Readers and Writers

2022-04-06 Thread Enrico Minack
Another project implementing DataSource V2 in Scala with Python wrapper: https://github.com/G-Research/spark-dgraph-connector Cheers, Enrico Am 06.04.22 um 12:01 schrieb Cheng Pan: There are some projects based on Spark DataSource V2 that I hope will help you.

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-31 Thread Enrico Minack
How well Spark can scale up with your data (in terms of years of data) depends on two things: the operations performed on the data, and characteristics of the data, like value distributions. Failing tasks smell like you are using operations that do not scale (e.g. Cartesian product of your

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Enrico Minack
> Wrt looping: if I want to process 3 years of data, my modest cluster will never do it one go , I would expect? > I have to break it down in smaller pieces and run that in a loop (1 day is already lots of data). Well, that is exactly what Spark is made for. It splits the work up and

Re: GraphX Support

2022-03-22 Thread Enrico Minack
Right, GraphFrames is not very active and maintainers don't even have the capacity to make releases. Enrico Am 22.03.22 um 00:10 schrieb Sean Owen: GraphX is not active, though still there and does continue to build and test with each Spark release. GraphFrames kind of superseded it, but is

Re: 回复:Re: calculate correlation between multiple columns and one specific column after groupby the spark data frame

2022-03-16 Thread Enrico Minack
If you have a list of Columns called `columns`, you can pass them to the `agg` method as:   agg(columns.head, columns.tail: _*) Enrico Am 16.03.22 um 08:02 schrieb ckgppl_...@sina.cn: Thanks, Sean. I modified the codes and have generated a list of columns. I am working on convert a list of

Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Enrico Minack
Sid, Your Aggregation Query selects all employees where less than three distinct salaries exist that are larger. So, both queries seem to do the same. The Windowing Query is explicit in what it does: give me the rank for salaries per department in the given order and pick the top 3 per

Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Enrico Minack
Though spark.read. refers to "built-in" data sources, there is nothing that prevents 3rd party libraries to "extend" spark.read in Scala or Python. As users know the Spark-way to read built-in data sources, it feels natural to hook 3rd party data sources under the same scheme, to give users a

Re: Query about Spark

2020-09-07 Thread Enrico Minack
You could use Horovod to distribute your ML algorithm on a cluster, while Horovod also supports Spark clusters. Enrico Am 06.09.20 um 15:30 schrieb Ankur Das: Good Evening Sir/Madam, Hope you are doing well, I am experimenting on some ML techniques where I need to test it on a distributed

Re: regexp_extract regex for extracting the columns from string

2020-08-10 Thread Enrico Minack
You can remove the <1000> first and then turn the string into a map (interpret the string as key-values). From that map you can access each key and turn it into a separate column: Seq(("<1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new packt=20 orgin=null address=null

Re: Unablee to get to_timestamp with Timezone Information

2020-04-02 Thread Enrico Minack
Once parsed into a Timestamp the timestamp is store internally as UTC and printed as your local timezone (e.g. as defined by spark.sql.session.timeZone). Spark is good at hiding timezone information from you. You can get the timezone information via date_format(column, format): import

Re: Issue with UDF Int Conversion - Str to Int

2020-03-23 Thread Enrico Minack
Ayan, no need for UDFs, the SQL API provides all you need (sha1, substring, conv): https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html >>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16, 10).cast("long").alias("sha2long")).show() +--+ |  sha2long|

Re: Time-based frequency table at scale

2020-03-11 Thread Enrico Minack
An interesting puzzle indeed. What is your measure of "that scales"? Does not fail, does not spill, does not need a huge amount of memory / disk, is O(N), processes X records per second and core? Enrico Am 11.03.20 um 16:59 schrieb sakag: Hi all, We have a rather interesting use case,

Re: Spark driver thread

2020-03-06 Thread Enrico Minack
James, If you are having multithreaded code in your driver, then you should allocate multiple cores. In cluster mode you share the node with other jobs. If you allocate fewer cores than you are using in your driver, then that node gets over-allocated and you are stealing other applications'

Re: Compute the Hash of each row in new column

2020-03-02 Thread Enrico Minack
n Fri, Feb 28, 2020 at 7:28 PM Enrico Minack <mailto:m...@enrico.minack.dev>> wrote: This computes the md5 hash of a given column id of Dataset ds: ds.withColumn("id hash", md5($"id")).show(false) Test with this Dataset ds: import org.apache.spa

Re:

2020-03-02 Thread Enrico Minack
Looks like the schema of some files is unexpected. You could either run parquet-tools on each of the files and extract the schema to find the problematic files: |hdfs |||-stat "%n"| |hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet

Re: Compute the Hash of each row in new column

2020-02-28 Thread Enrico Minack
This computes the md5 hash of a given column id of Dataset ds: ds.withColumn("id hash", md5($"id")).show(false) Test with this Dataset ds: import org.apache.spark.sql.types._ val ds = spark.range(10).select($"id".cast(StringType)) Available are md5, sha, sha1, sha2 and hash:

Re: Convert each partition of RDD to Dataframe

2020-02-27 Thread Enrico Minack
sequentially in Driver program and transform/write to hdfs one after the other * Or the current approach mentioned in the previous mail What will be the performance implications ? Regards Manjunath *From:* Enrico Minack

Re: Convert each partition of RDD to Dataframe

2020-02-27 Thread Enrico Minack
Hi Manjunath, why not creating 10 DataFrames loading the different tables in the first place? Enrico Am 27.02.20 um 14:53 schrieb Manjunath Shetty H: Hi Vinodh, Thanks for the quick response. Didn't got what you meant exactly, any reference or snippet  will be helpful. To explain the

[SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

2020-02-26 Thread Enrico Minack
I have created a jira to track this request: https://issues.apache.org/jira/browse/SPARK-30957 Enrico Am 08.02.20 um 16:56 schrieb Enrico Minack: Hi Devs, I am forwarding this from the user mailing list. I agree that the <=> version of join(Dataset[_], Seq[String]) would be useful.

Re: Questions about count() performance with dataframes and parquet files

2020-02-17 Thread Enrico Minack
large tables ? Is caching faster than recomputing both insert/update ? Thanks Enrico Minack writes: Ashley, I want to suggest a few optimizations. The problem might go away but at least performance should improve. The freeze problems could have many reasons, the Spark UI SQL pages and stages

Re: Questions about count() performance with dataframes and parquet files

2020-02-13 Thread Enrico Minack
Ashley, I want to suggest a few optimizations. The problem might go away but at least performance should improve. The freeze problems could have many reasons, the Spark UI SQL pages and stages detail pages would be useful. You can send them privately, if you wish. 1. the repartition(1)

Re: Reading 7z file in spark

2020-01-14 Thread Enrico Minack
Hi, Spark does not support 7z natively, but you can read any file in Spark: def read(stream: PortableDataStream):Iterator[String] = {Seq(stream.getPath()).iterator } spark.sparkContext .binaryFiles("*.7z") .flatMap(file => read(file._2)) .toDF("path") .show(false) This scales with

Re: [pyspark2.4+] When to choose RDD over Dataset, was: A lot of tasks failed, but job eventually completes

2020-01-06 Thread Enrico Minack
some or the other way to use windows on data frames. I always get confused as to when to fall back on RDD approach? Any use case in your experience warrant for RDD use, for better performance? Thanks, Rishi On Mon, Jan 6, 2020 at 4:18 AM Enrico Minack <mailto:m...@enrico.minack.dev>&

Re: OrderBy Year and Month is not displaying correctly

2020-01-06 Thread Enrico Minack
The distinct transformation does not preserve order, you need to distinct first, then orderby. Enrico Am 06.01.20 um 00:39 schrieb Mich Talebzadeh: Hi, I am working out monthly outgoing etc from an account and I am using the following code import org.apache.spark.sql.expressions.Window

Re: [pyspark2.4+] A lot of tasks failed, but job eventually completes

2020-01-06 Thread Enrico Minack
Note that repartitioning helps to increase the number of partitions (and hence to reduce the size of partitions and required executor memory), but subsequent transformations like join will repartition data again with the configured number of partitions (|spark.sql.shuffle.partitions|),

Re: Identify bottleneck

2019-12-19 Thread Enrico Minack
t 9:14 pm, Enrico Minack mailto:m...@enrico.minack.dev>> wrote: How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant. Reading 25m CSV lines and writing to Parqu

Re: Identify bottleneck

2019-12-18 Thread Enrico Minack
t it's not reasonable for maintaining purpose. I will try on a local instance and let you know. Thanks  for the help. *De: *"Enrico Minack" mailto:m...@enrico.minack.dev>> *À: *user@spark.a

Re: Identify bottleneck

2019-12-18 Thread Enrico Minack
How many withColumn statements do you have? Note that it is better to use a single select, rather than lots of withColumn. This also makes drops redundant. Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is really slow. Can you try this on a single machine, i.e. run wit

Re: Issue With mod function in Spark SQL

2019-12-17 Thread Enrico Minack
I think some example code would help to understand what you are doing. Am 18.12.19 um 08:12 schrieb Tzahi File: no.. there're 100M records both even and odd On Tue, Dec 17, 2019 at 8:13 PM Russell Spitzer mailto:russell.spit...@gmail.com>> wrote: Is there a chance your data is all even