There is Apache incubator project Uniffle:
https://github.com/apache/incubator-uniffle
It stores shuffle data on remote servers in memory, on local disk and HDFS.
Cheers,
Enrico
Am 06.04.24 um 15:41 schrieb Mich Talebzadeh:
I have seen some older references for shuffle service for k8s,
Hi Shay,
maybe this is related to the small number of output rows (1,250) of the
last exchange step that consume those 60GB shuffle data.
Looks like your outer transformation is something like
df.groupBy($"id").agg(collect_list($"prop_name"))
Have you tried adding a repartition as an attempt
Looks like what you want is to add a column that, when ordered by that
column, the current order of the dateframe is preserved.
All you need is the monotonically_increasing_id() function:
spark.range(0, 10, 1, 5).withColumn("row",
monotonically_increasing_id()).show()
+---+---+
| id|
e the join also gets optimized away, but table df is still filtered
for col1 = 'c', which iterates over the rows and collects the metrics
for observation 1.
Hope this helps to understand why there are no observed metrics for
Observation("1") in your case.
Enrico
Am 04.12.23 um 10:45 schr
Hi Michail,
observations as well as ordinary accumulators only observe / process
rows that are iterated / consumed by downstream stages. If the query
plan decides to skip one side of the join, that one will be removed from
the final plan completely. Then, the Observation will not retrieve any
Sean is right, casting timestamps to strings (which is what show() does)
uses the local timezone, either the Java default zone `user.timezone`,
the Spark default zone `spark.sql.session.timeZone` or the default
DataFrameWriter zone `timeZone`(when writing to file).
You say you are in PST,
Hi,
given your dataset:
val df=Seq(
(1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523,
"M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12,
Hi,
You could rearrange the DataFrame so that writing the DataFrame as-is
produces your structure:
df = spark.createDataFrame([(1, "a1"), (2, "a2"), (3, "a3")], "id int,
datA string")
+---++
| id|datA|
+---++
| 1| a1|
| 2| a2|
| 3| a3|
+---++
df2 = df.select(df.id,
Hi,
For an aggregating UDF, use spark.udf.registerJavaUDAF(name, className).
Enrico
Am 23.04.23 um 23:42 schrieb Thomas Wang:
Hi Spark Community,
I have implemented a custom Spark Aggregator (a subclass to
|org.apache.spark.sql.expressions.Aggregator|). Now I'm trying to use
it in a
You have to take each row and zip the lists, each element of the result
becomes one new row.
So turn write a method that turns
Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))
into
List(List("A","C","E"), List("B","D","null"), List("null","null","null"))
and use
oop in parallel "?
btw this didn't work:
for (String columnName : df.columns()) {
df= df.withColumn(columnName,
collect_set(col(columnName)).as(columnName));
}
Le dim. 12 févr. 2023 à 20:36, Enrico Minack
a écrit :
That is unfortunate, but 3.4.0 is around
ave a single DataFrame that computes all columns in a single
Spark job.
But this reads all distinct values into a single partition, which has
the same downside as collect, so this is as bad as using collect.
Cheers,
Enrico
Am 12.02.23 um 18:05 schrieb sam smith:
@Enrico Minack <mailto:enri
You could do the entire thing in DataFrame world and write the result to
disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
Note this is Scala but should be straightforward to translate into Java:
import org.apache.spark.sql.functions.collect_set
val df = Seq((1, 10, 123),
Hi,
you are right, that is an interesting question.
Looks like GROUP BY is doing something funny / magic here (spark-shell
3.3.1 and 3.5.0-SNAPSHOT):
With an alias, it behaves as you have pointed out:
spark.range(3).createTempView("ids_without_dots")
spark.sql("SELECT * FROM
Hi Tanin,
running your test with option "spark.sql.planChangeLog.level" set to
"info" or "warn" (depending on your Spark log level) will show you
insights into the planning (which rules are applied, how long rules
take, how many iterations are done).
Hoping this helps,
Enrico
Am 25.10.22
Hi,
Spark is fine with that many Parquet files in general:
# generate 100,000 small Parquet files
spark.range(0, 100, 1, 10).write.parquet("too-many-files.parquet")
# read 100,000 Parquet files
val df = spark.read.parquet("too-many-files.parquet")
df.show()
df.count()
Reading the
0796737203|
+---+---+-+---+
Thanks,
Swetha
On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack
wrote:
Yes, you can expect each partition file to be sorted by "col1" and
"col2".
However, values
If with "won't affect the performance" you mean "parquet is splittable
though it uses snappy", then yes. Splittable files allow for optimal
parallelization, which "won't affect performance".
Spark writing data will split the data into multiple files already (here
parquet files). Even if each
Yes, you can expect each partition file to be sorted by "col1" and "col2".
However, values for "col1" will be "randomly" allocated to partition
files, but all rows with the same value for "col1" will reside in the
same one partition file.
What kind of unexpected sort order do you observe?
and the JAR library uses
different dependencies.
Hope this findings helps others as well.
Thanks,
Muthu
On Mon, 11 Jul 2022 at 14:11, Enrico Minack
wrote:
All you need to do is implement a method readJson that reads a
single file given its path. Than, you map the values of column
All you need to do is implement a method readJson that reads a single
file given its path. Than, you map the values of column file_path to the
respective JSON content as a string. This can be done via an UDF or
simply Dataset.map:
case class RowWithJsonUri(entity_id: String, file_path:
lead to OOM error?
Thanks,
Sid
On Wed, Jun 22, 2022 at 6:40 PM Enrico Minack
wrote:
The RAM and disk memory consumtion depends on what you do with the
data after reading them.
Your particular action will read 20 lines from the first partition
and show them. So it will not use
The RAM and disk memory consumtion depends on what you do with the data
after reading them.
Your particular action will read 20 lines from the first partition and
show them. So it will not use any RAM or disk, no matter how large the
CSV is.
If you do a count instead of show, it will
Maybe a
.as[String].mapPartitions(it => if (it.hasNext) Iterator(it.next) else
Iterator.empty)
might be faster than the
.distinct.as[String]
Enrico
Am 19.06.22 um 08:59 schrieb Enrico Minack:
Given you already know your input files (input_file_name), why not
getting their s
Given you already know your input files (input_file_name), why not
getting their size and summing this up?
|import java.io.File ||import java.net.URI|
|import| org.apache.spark.sql.functions.input_file_name
|ds.select(input_file_name.as("filename")) .distinct.as[String]
.map(filename => new
)
finalDFStatus = finalDF.withColumn("edl_timestamp",
to_timestamp(lit(F.TimeNow(.withColumn(
"status_for_each_batch",
lit(str(response)))
print("Max Value:::")
print(maxValue)
I am expecting the payload to be as a JSON string to be a record like
below:
{"A":"some_value","B":"some_value"}
Where A and B are the columns in my dataset.
On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack
wrote:
Sid,
just recognized you are
use of a column expression. What do you expect |print(payload)|
to be?
I recommend to split that complex command into multiple commands to find
out what "an error of column not iterable" refers to.
Enrico
Am 10.06.22 um 13:39 schrieb Enrico Minack:
Hi Sid,
||finalDF = finalDF.r
Hi Sid,
||finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
.withColumn("status_for_batch", call_to_cust_bulk_api(policyUrl,
to_json(struct(*colsListToBePassed | |
You are calling ||withColumn|| with the result of
||call_to_cust_bulk_api|| as the second argument. That result
You refer to df.write.partitionBy, which creates for each value of "col"
a directory, and in worst-case writes one file per DataFrame partition.
So the number of output files is controlled by cardinality of "col",
which is your data and hence out of control, and the number of
partitions of
);
will yield StringType as a type for column c1 similarly for c6
I want to return the true type of each column by first discarding the "+"
I use Dataset after filtering the rows (removing "+") because
i can re-read the new dataset using .csv() method.
Any better idea to do th
Can you provide an example string (row) and the expected inferred schema?
Enrico
Am 04.06.22 um 18:36 schrieb marc nicole:
How to do just that? i thought we only can inferSchema when we first
read the dataset, or am i wrong?
Le sam. 4 juin 2022 à 18:10, Sean Owen a écrit :
It sounds
Nikhil,
What are you trying to achieve with this in the first place? What are
your goals? What is the problem with your approach?
Are you concerned about the 1000 files in each written col2-partition?
The write.partitionBy is something different that df.repartition or
df.coalesce.
The df
Another project implementing DataSource V2 in Scala with Python wrapper:
https://github.com/G-Research/spark-dgraph-connector
Cheers,
Enrico
Am 06.04.22 um 12:01 schrieb Cheng Pan:
There are some projects based on Spark DataSource V2 that I hope will help you.
How well Spark can scale up with your data (in terms of years of data)
depends on two things: the operations performed on the data, and
characteristics of the data, like value distributions.
Failing tasks smell like you are using operations that do not scale
(e.g. Cartesian product of your
> Wrt looping: if I want to process 3 years of data, my modest cluster
will never do it one go , I would expect?
> I have to break it down in smaller pieces and run that in a loop (1
day is already lots of data).
Well, that is exactly what Spark is made for. It splits the work up and
Right, GraphFrames is not very active and maintainers don't even have
the capacity to make releases.
Enrico
Am 22.03.22 um 00:10 schrieb Sean Owen:
GraphX is not active, though still there and does continue to build
and test with each Spark release. GraphFrames kind of superseded it,
but is
If you have a list of Columns called `columns`, you can pass them to the
`agg` method as:
agg(columns.head, columns.tail: _*)
Enrico
Am 16.03.22 um 08:02 schrieb ckgppl_...@sina.cn:
Thanks, Sean. I modified the codes and have generated a list of columns.
I am working on convert a list of
Sid,
Your Aggregation Query selects all employees where less than three
distinct salaries exist that are larger. So, both queries seem to do the
same.
The Windowing Query is explicit in what it does: give me the rank for
salaries per department in the given order and pick the top 3 per
Though spark.read. refers to "built-in" data sources, there is
nothing that prevents 3rd party libraries to "extend" spark.read in
Scala or Python. As users know the Spark-way to read built-in data
sources, it feels natural to hook 3rd party data sources under the same
scheme, to give users a
You could use Horovod to distribute your ML algorithm on a cluster,
while Horovod also supports Spark clusters.
Enrico
Am 06.09.20 um 15:30 schrieb Ankur Das:
Good Evening Sir/Madam,
Hope you are doing well, I am experimenting on some ML techniques
where I need to test it on a distributed
You can remove the <1000> first and then turn the string into a map
(interpret the string as key-values). From that map you can access each
key and turn it into a separate column:
Seq(("<1000> date=2020-08-01 time=20:50:04 name=processing id=123
session=new packt=20 orgin=null address=null
Once parsed into a Timestamp the timestamp is store internally as UTC
and printed as your local timezone (e.g. as defined by
spark.sql.session.timeZone). Spark is good at hiding timezone
information from you.
You can get the timezone information via date_format(column, format):
import
Ayan,
no need for UDFs, the SQL API provides all you need (sha1, substring, conv):
https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html
>>> df.select(conv(substring(sha1(col("value_to_hash")), 33, 8), 16,
10).cast("long").alias("sha2long")).show()
+--+
| sha2long|
An interesting puzzle indeed.
What is your measure of "that scales"? Does not fail, does not spill,
does not need a huge amount of memory / disk, is O(N), processes X
records per second and core?
Enrico
Am 11.03.20 um 16:59 schrieb sakag:
Hi all,
We have a rather interesting use case,
James,
If you are having multithreaded code in your driver, then you should
allocate multiple cores. In cluster mode you share the node with other
jobs. If you allocate fewer cores than you are using in your driver,
then that node gets over-allocated and you are stealing other
applications'
n Fri, Feb 28, 2020 at 7:28 PM Enrico Minack <mailto:m...@enrico.minack.dev>> wrote:
This computes the md5 hash of a given column id of Dataset ds:
ds.withColumn("id hash", md5($"id")).show(false)
Test with this Dataset ds:
import org.apache.spa
Looks like the schema of some files is unexpected.
You could either run parquet-tools on each of the files and extract the
schema to find the problematic files:
|hdfs |||-stat "%n"|
|hdfs://ip-172-24-89-229.blaah.com:8020/user/hadoop/origdata/part-*.parquet
This computes the md5 hash of a given column id of Dataset ds:
ds.withColumn("id hash", md5($"id")).show(false)
Test with this Dataset ds:
import org.apache.spark.sql.types._
val ds = spark.range(10).select($"id".cast(StringType))
Available are md5, sha, sha1, sha2 and hash:
sequentially in Driver program and
transform/write to hdfs one after the other
* Or the current approach mentioned in the previous mail
What will be the performance implications ?
Regards
Manjunath
*From:* Enrico Minack
Hi Manjunath,
why not creating 10 DataFrames loading the different tables in the first
place?
Enrico
Am 27.02.20 um 14:53 schrieb Manjunath Shetty H:
Hi Vinodh,
Thanks for the quick response. Didn't got what you meant exactly, any
reference or snippet will be helpful.
To explain the
I have created a jira to track this request:
https://issues.apache.org/jira/browse/SPARK-30957
Enrico
Am 08.02.20 um 16:56 schrieb Enrico Minack:
Hi Devs,
I am forwarding this from the user mailing list. I agree that the <=>
version of join(Dataset[_], Seq[String]) would be useful.
large tables ? Is
caching faster than recomputing both insert/update ?
Thanks
Enrico Minack writes:
Ashley,
I want to suggest a few optimizations. The problem might go away but
at least performance should improve.
The freeze problems could have many reasons, the Spark UI SQL pages
and stages
Ashley,
I want to suggest a few optimizations. The problem might go away but at
least performance should improve.
The freeze problems could have many reasons, the Spark UI SQL pages and
stages detail pages would be useful. You can send them privately, if you
wish.
1. the repartition(1)
Hi,
Spark does not support 7z natively, but you can read any file in Spark:
def read(stream: PortableDataStream):Iterator[String] =
{Seq(stream.getPath()).iterator }
spark.sparkContext
.binaryFiles("*.7z")
.flatMap(file => read(file._2))
.toDF("path")
.show(false)
This scales with
some or the other way to use windows on data
frames. I always get confused as to when to fall back on RDD approach?
Any use case in your experience warrant for RDD use, for better
performance?
Thanks,
Rishi
On Mon, Jan 6, 2020 at 4:18 AM Enrico Minack <mailto:m...@enrico.minack.dev>&
The distinct transformation does not preserve order, you need to
distinct first, then orderby.
Enrico
Am 06.01.20 um 00:39 schrieb Mich Talebzadeh:
Hi,
I am working out monthly outgoing etc from an account and I am using
the following code
import org.apache.spark.sql.expressions.Window
Note that repartitioning helps to increase the number of partitions (and
hence to reduce the size of partitions and required executor memory),
but subsequent transformations like join will repartition data again
with the configured number of partitions
(|spark.sql.shuffle.partitions|),
t 9:14 pm, Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:
How many withColumn statements do you have? Note that it is
better to use a single select, rather than lots of withColumn.
This also makes drops redundant.
Reading 25m CSV lines and writing to Parqu
t it's not
reasonable for maintaining purpose.
I will try on a local instance and let you know.
Thanks for the help.
*De: *"Enrico Minack" mailto:m...@enrico.minack.dev>>
*À: *user@spark.a
How many withColumn statements do you have? Note that it is better to
use a single select, rather than lots of withColumn. This also makes
drops redundant.
Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is
really slow. Can you try this on a single machine, i.e. run wit
I think some example code would help to understand what you are doing.
Am 18.12.19 um 08:12 schrieb Tzahi File:
no.. there're 100M records both even and odd
On Tue, Dec 17, 2019 at 8:13 PM Russell Spitzer
mailto:russell.spit...@gmail.com>> wrote:
Is there a chance your data is all even
62 matches
Mail list logo