Have you tried taking several thread dumps across executors to see if the
executors are consistently waiting for a resource?
I suspect it’s S3.. S3’s list operation doesn’t scale with the number of keys
in a folder. You aren’t being throttled by S3. S3 is just slow when you have
lot of small ob
You can partition and bucket a Dataframe by any column. You can create a column
using an expression. You can add a paritition_id column to your dataframe, and
partition/bucket by that column
From: David Diebold
Date: Monday, April 11, 2022 at 11:48 AM
To: "user @spark"
Subject: [EXTERNAL] Ques
What format are you writing the file to? Are you planning on your own custom
format, or are you planning to use standard formats like parquet?
Note that Spark can write numeric data in most standard formats. If you use
custom format instead, whoever consumes the data needs to parse your data. T
No, You don’t need 30 dataframes and self joins. Convert a list of columns to a
list of functions, and then pass the list of functions to the agg function
From: "ckgppl_...@sina.cn"
Reply-To: "ckgppl_...@sina.cn"
Date: Wednesday, March 16, 2022 at 8:16 AM
To: Enrico Minack , Sean Owen
Cc: use
The toDF function in scala uses a bit of Scala magic that allows you to add
methods to existing classes. Here’s a link to explanation
https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch01s11.html
In short, you can implement a class that extends the List class and add methods
to
figure, but it will be in the ballpark.
From: Maksim Grinman
Date: Friday, February 11, 2022 at 2:21 PM
To: "Lalwani, Jayesh"
Cc: Mich Talebzadeh , Holden Karau
, Sean Owen , "user @spark"
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps
CAUTION: This email originated
dump if your tasks are
taking few seconds
From: Maksim Grinman
Date: Thursday, February 10, 2022 at 7:21 PM
To: "Lalwani, Jayesh"
Cc: Mich Talebzadeh , Holden Karau
, Sean Owen , "user @spark"
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps
CAUTION: This email ori
You will need to provide more info.
Does the data contain records?
Are the records "homogenous" ; ie; do they have the same fields?
What is the format of the data?
Are records separated by lines/seperators?
Is the data sharded across multiple files?
How big is each shard?
On 2/8/22, 11:50 AM,
Probably not the answer you are looking for, but the best thing to do is to
avoid making Spark code sleep. Is there a way you can predict how big your
autoscaling group needs to be without looking at all the data? Are you using
fixed number of Spark executors or are you have some way of scaling
One thing to be pointed out is that you never bundle the Spark Client with your
code. You compile against a Spark version. You bundle your code (without Spark
jars) in an uber jar and deploy the Uber jar into Spark. Spark is already
bundled with the jars that are required to send jobs to schedul
1. Is your join and aggregation based on the same keys?
You might want to look at the execution plan. It is possible that without
checkpointing, Spark puts join and aggregation into the same stage to eliminate
shuffling. With a checkpoint, you might have forced Spark to introduce a
shuffle.
Tasks are never dependent on each other. Stages are dependent on each other.
The Spark task manager will make sure that it plans the tasks so that they can
run indepdendently.
Out of the 80K tasks, how many are complete when you have 7 remaining? Is it
80k - 7 ? It could be that you have data
e 25, 2021 at 12:57 PM
To: "user@spark.apache.org" , "Lalwani, Jayesh"
Subject: RE: [EXTERNAL] Recovery when two spark nodes out of 6 fail
CAUTION: This email originated from outside of the organization. Do not click
links or open attachments unless you can confirm the sender
Spark replicates the partitions among multiple nodes. If one executor fails, it
moves the processing over to other executor. However, if the data is lost, it
re-executes the processing that generated the data, and might have to go back
to the source.
In case of failure, there will be delay in g
You can always chain aggregations by chaining multiple Structured Streaming
jobs. It’s not a showstopper.
Getting Spark on Kubernetes is important for organizations that want to pursue
a multi-cloud strategy
From: Mich Talebzadeh
Date: Wednesday, June 23, 2021 at 11:27 AM
To: "user @spark"
Cc
flatMap is supposed to return Seq, not Iterator. You are returning a class that
implements Iterator. I have a hunch that's what's causing the confusion.
flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do you intend it to
be RDD[CrawlData]? You might want to call toSeq on FairFetcher.
If the UDFs are computationally expensive, I wouldn't solve this problem with
UDFs at all. If they are working in an iterative manner, and assuming each
iteration is independent of other iterations (yes, I know that's a big
assumptiuon), I would think about exploding your dataframe to have a ro
1. How does spark know the data size is 5 million?
Depends on the source. Some sources (database/parquet) tell you. Some
sources(CSV, JSON) need to be guesstimated
2. Are there any books or documentation that takes one simple job and goes
deeper in terms of understanding what happens u
The specifics depend on what's going on underneath. At the 10,000 foot level,
you probably know that Spark creates a Logical execution plan when you call it.
It converts it into a execution plan when you call an action. The Execution
plan has stages that are run sequentially. Stages are broken u
You don’t need to “launch batches” every 5 minutes. You can launch batches
every 2 seconds, and aggregate on window for 5 minutes. Spark will read data
from topic every 2 seconds, and keep the data in memory for 5 minutes.
You need to make few decisions
1. DO you want a tumbling window or a
You don’t have to union multiple RDDs. You can read files from multiple
directories in a single read call. Spark will manage partitioning of the data
across directories.
From: Kapil Garg
Date: Wednesday, May 5, 2021 at 10:45 AM
To: spark users
Subject: [EXTERNAL] How to read multiple HDFS dir
\
start()
spark.streams.awaitAnyTermination()
def sendToControl(dfnewtopic, batchId):
if(len(dfnewtopic.take(1))) > 0:
print(f"""newtopic batchId is {batchId}""")
dfnewtopic.show(100,False)
spark.streams.active.forEa
What are you trying to do? Can you give us a bigger picture?
From: Mich Talebzadeh
Date: Thursday, April 22, 2021 at 11:43 AM
To: "user @spark"
Subject: RE: [EXTERNAL] Dealing with two topics in Spark Structured Streaming
CAUTION: This email originated from outside of the organization. Do not
There is no good answer to the question “Have I learnt enough”. You can never
learn enough. You have to constrantly learn
Practically, if you want to make a career out of using technology XYZ, you only
need to learn enough XYZ to get a job doing XYZ. Once you get a job doing XYZ,
other people a
Short Answer: Yes
Long answer: You need to understand your load characteristics to size your
cluster. Most applications have 3 components to their load. A) a predictable
amount of expected load. This usually changes based on time of day, and day of
week The main thing is that it’s predictable.
Can I cut a steak with a hammer? Sure you can, but the steak would taste awful
Do you have organizational/bureaucratic issues with using a Load Balancer?
Because that’s what you really need. Run your application on multiple nodes
with a load balancer in front. When a node crashes, the load balan
Yes, as you found, in local mode, Spark won’t serialize your objects. It will
just pass the reference to the closure. This means that it is possible to write
code that works in local mode, but doesn’t when you run distributed.
From: Sheel Pancholi
Date: Friday, February 26, 2021 at 4:24 AM
To:
AWS has 2 offerings built on top of Spark: EMR and Glue. You can, of course,
spin up your EC2 instances and deploy Spark on it. The 3 offerings allows you
to tradeoff between flexibility and infrastructure management. EC2 gives you
the most flexibility, because it's basically a bunch of nodes,
Presto has slightly lower latency than Spark, but I've found that it gets stuck
on some edge cases.
If you are on AWS, then the simplest solution is to use Athena. Athena is built
on Presto, has a JDBC driver, and is serverless, so you don't have to take any
headaches
On 2/18/21, 3:32 PM, "S
There are several step by step guides that you can find online by googling
https://spark.apache.org/docs/latest/sql-distributed-sql-engine.html
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-thrift-server.html
https://medium.com/@saipeddy/setting-up-a-thrift-server-4eb0c5
All of the major cloud vendors have some sort of Spark offering. They provide
support if you build in their cloud.
From: Синий Андрей
Date: Tuesday, January 26, 2021 at 7:52 AM
To: "user@spark.apache.org"
Subject: [EXTERNAL] Apache Spark
CAUTION: This email originated from outside of the orga
If you are going aggregations, you need to watermark the data. Depending on
what aggrgations you are doing, state might keep accumulating till failure.
From: Eric Beabes
Date: Thursday, January 21, 2021 at 12:19 PM
To: Sean Owen
Cc: spark-user
Subject: RE: [EXTERNAL] Only one Active task in Sp
You don’t have a column named “created”. The column name is “ceated”, without
the “r”
From: Mich Talebzadeh
Date: Monday, January 4, 2021 at 1:06 PM
To: "user @spark"
Subject: [EXTERNAL] Spark DF does not rename the column
CAUTION: This email originated from outside of the organization. Do no
Yes. It is necessary to have a distributed file system because all the workers
need to read/write to the checkpoint. The distributed file system has to be
immediately consistent: When one node writes to it, the other nodes should be
able to read it immediately
The solutions/workarounds depend on
: Amit Sharma
Reply-To: "resolve...@gmail.com"
Date: Monday, December 7, 2020 at 1:47 PM
To: "Lalwani, Jayesh" , "user@spark.apache.org"
Subject: RE: [EXTERNAL] Caching
CAUTION: This email originated from outside of the organization. Do not click
links or open atta
Since DF2 is dependent on DF1, and DF3 is dependent on both DF1 and DF2,
without caching, Spark will read the CSV twice: Once to load it for DF1, and
once to load it for DF2. When you add a cache on DF1 or DF2, it reads from CSV
only once.
You might want to look at doing a windowed query on D
incurring IO overhead on every microbatch.
From: Arti Pande
Date: Friday, November 13, 2020 at 2:19 PM
To: "Lalwani, Jayesh"
Cc: "user@spark.apache.org"
Subject: RE: [EXTERNAL] Refreshing Data in Spark Memory (DataFrames)
CAUTION: This email originated from outside of the
Is this a streaming application or a batch application?
Normally, for batch applications, you want to keep data consistent. If you have
a portfolio of mortgages that you are computing payments for and the interest
rate changes while you are computing payments, you don’t want to compute half
th
Note that Spark never guarantees ordering of columns. There’s nothing in Spark
documentation that says that the columns will be ordered a certain way. The
proposed solution relies on an implementation detail that might change in
future version of Spark.
Ideally, you shouldn’t rely on Dataframe
Append mode will wait till watermark expires
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
This is because it append mode doesn't output rows until a row is finalized.
Thereotically, data for any row can appear anytime as long as it's in the
waterm
Are you getting any output? Streaming jobs typically run forever, and keep
processing data as it comes in the input. If a streaming job is working well,
it will typically generate output at a certain cadence
From: KhajaAsmath Mohammed
Date: Tuesday, October 20, 2020 at 1:23 PM
To: "user @spark"
. this might affect later use, such
collect. the resulting GC can be explained by both caching and collect
Lalwani, Jayesh writes:
> I have a Dataframe with around 6 billion rows, and about 20 columns.
First of all, I want to write this dataframe out to parquet. The, Out of the 20
colum
I have a Dataframe with around 6 billion rows, and about 20 columns. First of
all, I want to write this dataframe out to parquet. The, Out of the 20 columns,
I have 3 columns of interest, and I want to find how many distinct values of
the columns are there in the file. I don’t need the actual di
gestions
would be highly appreciated..
ND
On 10/16/20 2:49 PM, Lalwani, Jayesh wrote:
With a file based source, Spark is going to take maximum use of memory before
it tries to scaling to more nodes. Parallelization adds overhead. This overhead
is negligible if your data is several gigs or above.
:
Thanks for the input. What I am interested is how to have multiple
workers to read and process the small files in parallel, and certainly
one file per worker at a time. Partitioning data frame doesn't make
sense since the data frame is small already.
On 10/15/20 9:14 AM, Lalwani, Jayesh wrote:
&
Parallelism of streaming depends on the input source. If you are getting one
small file per microbatch, then Spark will read it in one worker. You can
always repartition your data frame after reading it to increase the parallelism.
On 10/14/20, 11:26 PM, "Artemis User" wrote:
CAUTION: Thi
Where are you running your Spark cluster? Can you post the command line that
you are using to run your application?
Spark is designed to process a lot of data by distributing work to a cluster of
a machines. When you submit a job, it starts executor processes on the cluster.
So, what you are se
You could covert columns to rows. Some thing like this
val cols = [“A”, “B”, “C”]
df.flatMap( row => {
cols.map(c => (row.getAsTimeStamp(“timestamp”), row.getAsInt(c), c) )
}).toDF(“timestamp”, “value”, “colName”)
If you are using dataframes, all of your columns are of the same type. If they
We are trying to run a Spark job on Kubernetes cluster. The Spark job needs to
talk to some services external to the Kubernetes cluster through a proxy
server. We are setting the proxy by setting the extraJavaOptions like this
--conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost
-D
You can use repartition method of Dataframe to change the number of partitions
https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.Dataset@repartition(numPartitions:Int):org.apache.spark.sql.Dataset[T]
On 6/22/18, 3:04 PM, "pratik4891" wrote:
It's default , I have
We are trying to add a column to a Dataframe with some data that is seeded by
some random data. We want to be able to control the seed, so multiple runs of
the same transformation generate the same output. We also want to generate
different random numbers for each partition
This is easy to do w
Actually, you can do partition level ingest using ForEachWriter. You just have
to add each row to a list in the write method, and write to the data store in
the close method
I know it’s awkward. I don’t know why Spark doesn’t provide a
ForEachPartitionWriter
From: Yogesh Mahajan
Date: Thursd
Open source Spark Structured Streaming doesn’t have a JDBC sink. You can
implement your own ForEachWriter, or you can use my sink from here
https://github.com/GaalDornick/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/JdbcSink.scala
https://github.com/GaalDorni
Purna,
This behavior is by design. If you provide partitionBy, Spark removes the
columns from the data
From: purna pradeep
Date: Monday, June 4, 2018 at 8:00 PM
To: "user@spark.apache.org"
Subject: spark partitionBy with partitioned column in json output
im reading below json in spark
{"
This will not work the way you have implemented it. The code that you have here
will be called only once before the streaming query is started. Once the
streaming query starts, this code is not called
What I would do is
1. Implement a udf that calculates flourtimestamp
2. Add a column in
Few things
1. Append mode is going to output data that falls out of the watermark
2. Structured streaming isn’t time based. It reacts only when it sees input
data. If no data appears in the input it will not move the aggregation window
3. Clock time is irrelevant to structured streaming
No. There is no way to control the order except for the option that you have
already tried (repartition =1). When you are inserting in parallel from
multiple nodes, then the order of inserts cannot be guaranteed. That is because
of the very nature of doing things in parallel. The only way order
Do you have a code sample, and detailed error message/exception to show?
From: Martin Engen
Date: Tuesday, May 15, 2018 at 9:24 AM
To: "user@spark.apache.org"
Subject: Structured Streaming, Reading and Updating a variable
Hello,
I'm working with Structured Streaming, and I need a method of kee
This is a long standing bug in Spark. –jars and –files doesn’t work in
Standalone mode
https://issues.apache.org/jira/browse/SPARK-4160
From: Marius
Date: Wednesday, May 9, 2018 at 3:51 AM
To: "user@spark.apache.org"
Subject: Spark 2.3.0 --files vs. addFile()
Hey,
i am using Spark to distribu
There is a trade off involved here. If you have a Spark application with a
complicated logical graph, you can either cache data at certain points in the
DAG, or you don’t cache data. The side effect of caching data is higher memory
usage. The side effect of not caching data is higher CPU usage a
Neither.
All women are humans. Not all humans are women. You wouldn’t say that a woman
is a subset of a human.
All DataFrames are DataSets. Not all Datasets are DataFrames. The “subset”
relationship doesn’t apply here. A DataFrame is a specialized type of DataSet
From: Michael Artz
Date: Sat
What columns do you want to filter myDataSet on? What are the corresponding
columns in paramsDataSet?
You can easily do what you want using a inner join. For example, if tempview
and paramsview both have a column, say employeeID. You can do this with the SQl
sparkSession.sql("Select * from tem
Although there is such a thing as virtualization of memory done at the OS
layer, JVM imposes it’s own limit that is controlled by the
spark.executor.memory and spark.driver.memory configurations. The amount of
memory allocated by JVM will be controlled by those parameters. General
guidelines sa
You could have a really large window.
From: Aakash Basu
Date: Monday, April 16, 2018 at 10:56 AM
To: "Lalwani, Jayesh"
Cc: spark receiver , Panagiotis Garefalakis
, user
Subject: Re: [Structured Streaming] More than 1 streaming in a code
If I use timestamp based windowing, then
, April 16, 2018 at 4:52 AM
To: "Lalwani, Jayesh"
Cc: spark receiver , Panagiotis Garefalakis
, user
Subject: Re: [Structured Streaming] More than 1 streaming in a code
Hey Jayesh and Others,
Is there then, any other way to come to a solution for this use-case?
Thanks,
Aakash.
On M
Is your input data partitioned? How much memory have you assigned to your
executor? Have you looked at how much time is being spent in GC in the
executor? Is Spark spilling the data into disk?
It is likely that the partition is too big. Spark tries to read the whole
partition into the memory of
Note that what you are trying to do here is join a streaming data frame with an
aggregated streaming data frame. As per the documentation, joining an
aggregated streaming data frame with another streaming data frame is not
supported
From: spark receiver
Date: Friday, April 13, 2018 at 11:49 P
need to make a call whether you want to take the
upfront cost of a shuffle, or you want to live with large number of tasks
From: Tin Vu
Date: Thursday, March 29, 2018 at 10:47 AM
To: "Lalwani, Jayesh"
Cc: "user@spark.apache.org"
Subject: Re: [SparkSQL] SparkSQL performance o
Try putting a Broadcast hint like show here
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-hint-framework.html#sql-hints
From: Vitaliy Pisarev
Date: Thursday, March 29, 2018 at 8:42 AM
To: "user@spark.apache.org"
Subject: Why doesn't spark use broadcast join?
I am looking at
Without knowing too many details, I can only guess. It could be that Spark is
creating a lot of tasks even though there are less records. Creation and
distribution of tasks has a noticeable overhead on smaller datasets.
You might want to look at the driver logs, or the Spark Application Detail U
Hi
I have a custom streaming sink that internally uses
org.apache.spark.sql.kafka010.KafkaSink. This was working in 2.2.. When I
upgraded to 2.3, I get this exception.
Does spark-sql-Kafka010 work on Spark 2.3?
84705281f4b]] DEBUG com.capitalone.sdp.spark.source.SdpSink - Writing batch to
Kaf
There is an incompatibility in LZ4 dependencies being imported in spark 2.3.0
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 imports
org.apache.kafka:kafka-clients:0.11.0.0, which imports net.jpountz.lz4:lz4:1.3.0
OTOH, org.apache.spark:spark-core_2.11:2.3.0 imports org.lz4:lz4-java:1.4.0
Thes
Does the Resource scheduler support dynamic resource allocation? Are there any
plans to add in the future?
The information contained in this e-mail is confidential and/or proprietary to
Capital One and/or its affiliates and may only be use
I noticed some weird behavior with NingWSClient 2.4.3. when used with Spark.
Try this
1. Spin up spark-shell with play-ws2.4.3 in driver class path
2. Run this code
val myConfig = new AsyncHttpClientConfigBean()
config.setAcceptAnyCertificate(true)
config.setFollowRedirect(true)
val on
You can repartition your dataframe into 1 partition and all the data will land
into one partition. However, doing this is perilious because you will end up
with all your data on one node, and if you have too much data you will run out
of memory. In fact, anytime you are thinking about putting da
: Nicholas Hakobian
Date: Friday, December 29, 2017 at 8:10 PM
To: "Lalwani, Jayesh"
Cc: "user@spark.apache.org"
Subject: Re: Subqueries
This sounds like a perfect example of using windowing functions. Have you tried
something like the following:
select ACCT_ID, CR_RVKD_S
There is no way to solve this within spark.
One option you could do is break up your application into multiple application.
First application can filter and write the filtered results into a kafka queue.
Second application can read from queue and sum. Third application can read from
queue and d
I have a table, and I want to find the latest records in the table. The table
has a column called instnc_id that is incremented everyday. So, I want to find
the records that have the max instnc_id.
I am trying to do this using subqueries, but it gives me an error. For example,
when I try this
Is union of 2 Structured streaming dataframes from different sources supported
in 2.2?
We have done a union of 2 streaming dataframes that are from the same source. I
wanted to know if multiple streams are supported or going to be supported in
the future
We have a Structured Streaming application running for more than 40 hours. We
are storing checkpoints in EFS. The application is failing with a checksum
error on the checkpoint (stack trace below)
Is this because the checkpoint is corrupt? Is there a way to fix this? Is this
a bug in Spark?
Ca
Are there any plans to add mapPartitioningWithIndex in the Dataframe API? Or is
there any way to implement my own mapPartitionWithIndex for a Dataframe?
I am implementing something which is logically similar to the randomSplit
function. In 2.1, randomSplit internally does df.mapPartitionWithInde
What is a good way to support non-homogenous input data? In structured streaming
Let me explain the use case that we are trying to solve. We are reading data
from 3 topics in Kafka. All the topics have data in Avro format, with each of
them having their own schema. Now, all the 3 Avro schemas ha
Michael,
I see that 2.2 RC6 has passed a vote on Friday. Does this mean 2.2 is going to
be out soon? Do you have some sort of ETA?
From: "Lalwani, Jayesh"
Date: Friday, July 7, 2017 at 5:46 PM
To: Michael Armbrust
Cc: "user@spark.apache.org" , #MM - Heartbeat
Sub
Great! Even, val dfAllEvents =
sparkSession.table("oldEvents").union(sparkSession.table("newEvents")) doesn’t
work. Will this be addressed in 2.2?
From: Michael Armbrust
Date: Friday, July 7, 2017 at 5:42 PM
To: "Lalwani, Jayesh"
Cc: "user@spark.apache.o
rust
Date: Friday, July 7, 2017 at 2:30 PM
To: "Lalwani, Jayesh"
Cc: "user@spark.apache.org"
Subject: Re: Union of 2 streaming data frames
df.union(df2) should be supported when both DataFrames are created from a
streaming source. What error are you seeing?
On Fri, Jul 7, 201
In structured streaming, Is there a way to Union 2 streaming data frames? Are
there any plans to support Union of 2 streaming dataframes soon? I can
understand the inherent complexity in joining 2 streaming data frames. But,
Union is just concatenating 2 microbatches, innit?
The problem that w
operations instead of using Spark SQL to join the dataframes?
From: Tathagata Das
Date: Wednesday, May 3, 2017 at 6:32 PM
To: "Lalwani, Jayesh"
Cc: user
Subject: Re: Refreshing a persisted RDD
If you want to always get the latest data in files, its best to always recreate
the DataFrame.
O
87 matches
Mail list logo