Re: Something about Spark which has bothered me for a very long time, which I've never understood

2022-05-05 Thread Lalwani, Jayesh
Have you tried taking several thread dumps across executors to see if the executors are consistently waiting for a resource? I suspect it’s S3.. S3’s list operation doesn’t scale with the number of keys in a folder. You aren’t being throttled by S3. S3 is just slow when you have lot of small

Re: Question about bucketing and custom partitioners

2022-04-11 Thread Lalwani, Jayesh
You can partition and bucket a Dataframe by any column. You can create a column using an expression. You can add a paritition_id column to your dataframe, and partition/bucket by that column From: David Diebold Date: Monday, April 11, 2022 at 11:48 AM To: "user @spark" Subject: [EXTERNAL]

Re: Spark Write BinaryType Column as continues file to S3

2022-04-08 Thread Lalwani, Jayesh
What format are you writing the file to? Are you planning on your own custom format, or are you planning to use standard formats like parquet? Note that Spark can write numeric data in most standard formats. If you use custom format instead, whoever consumes the data needs to parse your data.

Re: 回复:Re: 回复:Re: calculate correlation between_multiple_columns_and_one_specific_column_after_groupby_the_spark_data_frame

2022-03-16 Thread Lalwani, Jayesh
No, You don’t need 30 dataframes and self joins. Convert a list of columns to a list of functions, and then pass the list of functions to the agg function From: "ckgppl_...@sina.cn" Reply-To: "ckgppl_...@sina.cn" Date: Wednesday, March 16, 2022 at 8:16 AM To: Enrico Minack , Sean Owen Cc:

Re: Question on List to DF

2022-03-16 Thread Lalwani, Jayesh
The toDF function in scala uses a bit of Scala magic that allows you to add methods to existing classes. Here’s a link to explanation https://www.oreilly.com/library/view/scala-cookbook/9781449340292/ch01s11.html In short, you can implement a class that extends the List class and add methods

Re: Spark 3.1.2 full thread dumps

2022-03-01 Thread Lalwani, Jayesh
figure, but it will be in the ballpark. From: Maksim Grinman Date: Friday, February 11, 2022 at 2:21 PM To: "Lalwani, Jayesh" Cc: Mich Talebzadeh , Holden Karau , Sean Owen , "user @spark" Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps CAUTION: This email ori

Re: Spark 3.1.2 full thread dumps

2022-02-11 Thread Lalwani, Jayesh
dump if your tasks are taking few seconds From: Maksim Grinman Date: Thursday, February 10, 2022 at 7:21 PM To: "Lalwani, Jayesh" Cc: Mich Talebzadeh , Holden Karau , Sean Owen , "user @spark" Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps CAUTION: This email ori

Re: Help With unstructured text file with spark scala

2022-02-08 Thread Lalwani, Jayesh
You will need to provide more info. Does the data contain records? Are the records "homogenous" ; ie; do they have the same fields? What is the format of the data? Are records separated by lines/seperators? Is the data sharded across multiple files? How big is each shard? On 2/8/22, 11:50

Re: Spark 3.1.2 full thread dumps

2022-02-07 Thread Lalwani, Jayesh
Probably not the answer you are looking for, but the best thing to do is to avoid making Spark code sleep. Is there a way you can predict how big your autoscaling group needs to be without looking at all the data? Are you using fixed number of Spark executors or are you have some way of scaling

Re: [Spark] Does Spark support backward and forward compatibility?

2021-11-24 Thread Lalwani, Jayesh
One thing to be pointed out is that you never bundle the Spark Client with your code. You compile against a Spark version. You bundle your code (without Spark jars) in an uber jar and deploy the Uber jar into Spark. Spark is already bundled with the jars that are required to send jobs to

Re: [Spark Core][Intermediate][How-to]: Measure the time for a checkpoint

2021-10-07 Thread Lalwani, Jayesh
1. Is your join and aggregation based on the same keys? You might want to look at the execution plan. It is possible that without checkpointing, Spark puts join and aggregation into the same stage to eliminate shuffling. With a checkpoint, you might have forced Spark to introduce a shuffle.

Re: Why are in 1 stage most of my executors idle: are tasks within a stage dependent of each other?

2021-09-10 Thread Lalwani, Jayesh
Tasks are never dependent on each other. Stages are dependent on each other. The Spark task manager will make sure that it plans the tasks so that they can run indepdendently. Out of the 80K tasks, how many are complete when you have 7 remaining? Is it 80k - 7 ? It could be that you have data

Re: Recovery when two spark nodes out of 6 fail

2021-06-25 Thread Lalwani, Jayesh
e 25, 2021 at 12:57 PM To: "user@spark.apache.org" , "Lalwani, Jayesh" Subject: RE: [EXTERNAL] Recovery when two spark nodes out of 6 fail CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender

Re: Recovery when two spark nodes out of 6 fail

2021-06-25 Thread Lalwani, Jayesh
Spark replicates the partitions among multiple nodes. If one executor fails, it moves the processing over to other executor. However, if the data is lost, it re-executes the processing that generated the data, and might have to go back to the source. In case of failure, there will be delay in

Re: Spark on Kubernetes scheduler variety

2021-06-24 Thread Lalwani, Jayesh
You can always chain aggregations by chaining multiple Structured Streaming jobs. It’s not a showstopper. Getting Spark on Kubernetes is important for organizations that want to pursue a multi-cloud strategy From: Mich Talebzadeh Date: Wednesday, June 23, 2021 at 11:27 AM To: "user @spark"

Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Lalwani, Jayesh
flatMap is supposed to return Seq, not Iterator. You are returning a class that implements Iterator. I have a hunch that's what's causing the confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do you intend it to be RDD[CrawlData]? You might want to call toSeq on

Re: Merge two dataframes

2021-05-17 Thread Lalwani, Jayesh
If the UDFs are computationally expensive, I wouldn't solve this problem with UDFs at all. If they are working in an iterative manner, and assuming each iteration is independent of other iterations (yes, I know that's a big assumptiuon), I would think about exploding your dataframe to have a

Re: Understanding what happens when a job is submitted to a cluster

2021-05-13 Thread Lalwani, Jayesh
1. How does spark know the data size is 5 million? Depends on the source. Some sources (database/parquet) tell you. Some sources(CSV, JSON) need to be guesstimated 2. Are there any books or documentation that takes one simple job and goes deeper in terms of understanding what happens

Re: Understanding what happens when a job is submitted to a cluster

2021-05-13 Thread Lalwani, Jayesh
The specifics depend on what's going on underneath. At the 10,000 foot level, you probably know that Spark creates a Logical execution plan when you call it. It converts it into a execution plan when you call an action. The Execution plan has stages that are run sequentially. Stages are broken

Re: Calculate average from Spark stream

2021-05-10 Thread Lalwani, Jayesh
You don’t need to “launch batches” every 5 minutes. You can launch batches every 2 seconds, and aggregate on window for 5 minutes. Spark will read data from topic every 2 seconds, and keep the data in memory for 5 minutes. You need to make few decisions 1. DO you want a tumbling window or a

Re: How to read multiple HDFS directories

2021-05-05 Thread Lalwani, Jayesh
You don’t have to union multiple RDDs. You can read files from multiple directories in a single read call. Spark will manage partitioning of the data across directories. From: Kapil Garg Date: Wednesday, May 5, 2021 at 10:45 AM To: spark users Subject: [EXTERNAL] How to read multiple HDFS

Re: Dealing with two topics in Spark Structured Streaming

2021-04-22 Thread Lalwani, Jayesh
eams.awaitAnyTermination() def sendToControl(dfnewtopic, batchId): if(len(dfnewtopic.take(1))) > 0: print(f"""newtopic batchId is {batchId}""") dfnewtopic.show(100,False) spark.streams.active.forEach(_.stop) else: print("

Re: Dealing with two topics in Spark Structured Streaming

2021-04-22 Thread Lalwani, Jayesh
What are you trying to do? Can you give us a bigger picture? From: Mich Talebzadeh Date: Thursday, April 22, 2021 at 11:43 AM To: "user @spark" Subject: RE: [EXTERNAL] Dealing with two topics in Spark Structured Streaming CAUTION: This email originated from outside of the organization. Do not

Re: Python level of knowledge for Spark and PySpark

2021-04-14 Thread Lalwani, Jayesh
There is no good answer to the question “Have I learnt enough”. You can never learn enough. You have to constrantly learn Practically, if you want to make a career out of using technology XYZ, you only need to learn enough XYZ to get a job doing XYZ. Once you get a job doing XYZ, other people

Re: The trigger interval in spark structured streaming

2021-03-26 Thread Lalwani, Jayesh
Short Answer: Yes Long answer: You need to understand your load characteristics to size your cluster. Most applications have 3 components to their load. A) a predictable amount of expected load. This usually changes based on time of day, and day of week The main thing is that it’s predictable.

Re: Using Spark as a fail-over platform for Java app

2021-03-12 Thread Lalwani, Jayesh
Can I cut a steak with a hammer? Sure you can, but the steak would taste awful Do you have organizational/bureaucratic issues with using a Load Balancer? Because that’s what you really need. Run your application on multiple nodes with a load balancer in front. When a node crashes, the load

Re: Spark closures behavior in local mode in IDEs

2021-02-26 Thread Lalwani, Jayesh
Yes, as you found, in local mode, Spark won’t serialize your objects. It will just pass the reference to the closure. This means that it is possible to write code that works in local mode, but doesn’t when you run distributed. From: Sheel Pancholi Date: Friday, February 26, 2021 at 4:24 AM To:

Re: Spark on the cloud deployments

2021-02-24 Thread Lalwani, Jayesh
AWS has 2 offerings built on top of Spark: EMR and Glue. You can, of course, spin up your EC2 instances and deploy Spark on it. The 3 offerings allows you to tradeoff between flexibility and infrastructure management. EC2 gives you the most flexibility, because it's basically a bunch of nodes,

Re: how to serve data over JDBC using simplest setup

2021-02-18 Thread Lalwani, Jayesh
Presto has slightly lower latency than Spark, but I've found that it gets stuck on some edge cases. If you are on AWS, then the simplest solution is to use Athena. Athena is built on Presto, has a JDBC driver, and is serverless, so you don't have to take any headaches On 2/18/21, 3:32 PM,

Re: how to serve data over JDBC using simplest setup

2021-02-18 Thread Lalwani, Jayesh
There are several step by step guides that you can find online by googling https://spark.apache.org/docs/latest/sql-distributed-sql-engine.html https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-thrift-server.html

Re: Apache Spark

2021-01-26 Thread Lalwani, Jayesh
All of the major cloud vendors have some sort of Spark offering. They provide support if you build in their cloud. From: Синий Андрей Date: Tuesday, January 26, 2021 at 7:52 AM To: "user@spark.apache.org" Subject: [EXTERNAL] Apache Spark CAUTION: This email originated from outside of the

Re: Only one Active task in Spark Structured Streaming application

2021-01-21 Thread Lalwani, Jayesh
If you are going aggregations, you need to watermark the data. Depending on what aggrgations you are doing, state might keep accumulating till failure. From: Eric Beabes Date: Thursday, January 21, 2021 at 12:19 PM To: Sean Owen Cc: spark-user Subject: RE: [EXTERNAL] Only one Active task in

Re: Spark DF does not rename the column

2021-01-04 Thread Lalwani, Jayesh
You don’t have a column named “created”. The column name is “ceated”, without the “r” From: Mich Talebzadeh Date: Monday, January 4, 2021 at 1:06 PM To: "user @spark" Subject: [EXTERNAL] Spark DF does not rename the column CAUTION: This email originated from outside of the organization. Do

Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread Lalwani, Jayesh
Yes. It is necessary to have a distributed file system because all the workers need to read/write to the checkpoint. The distributed file system has to be immediately consistent: When one node writes to it, the other nodes should be able to read it immediately The solutions/workarounds depend

Re: Caching

2020-12-07 Thread Lalwani, Jayesh
: Amit Sharma Reply-To: "resolve...@gmail.com" Date: Monday, December 7, 2020 at 1:47 PM To: "Lalwani, Jayesh" , "user@spark.apache.org" Subject: RE: [EXTERNAL] Caching CAUTION: This email originated from outside of the organization. Do not click links or open atta

Re: Caching

2020-12-07 Thread Lalwani, Jayesh
Since DF2 is dependent on DF1, and DF3 is dependent on both DF1 and DF2, without caching, Spark will read the CSV twice: Once to load it for DF1, and once to load it for DF2. When you add a cache on DF1 or DF2, it reads from CSV only once. You might want to look at doing a windowed query on

Re: Refreshing Data in Spark Memory (DataFrames)

2020-11-13 Thread Lalwani, Jayesh
will be incurring IO overhead on every microbatch. From: Arti Pande Date: Friday, November 13, 2020 at 2:19 PM To: "Lalwani, Jayesh" Cc: "user@spark.apache.org" Subject: RE: [EXTERNAL] Refreshing Data in Spark Memory (DataFrames) CAUTION: This email originated from outside of

Re: Refreshing Data in Spark Memory (DataFrames)

2020-11-13 Thread Lalwani, Jayesh
Is this a streaming application or a batch application? Normally, for batch applications, you want to keep data consistent. If you have a portfolio of mortgages that you are computing payments for and the interest rate changes while you are computing payments, you don’t want to compute half

Re: Spark Dataset withColumn issue

2020-11-12 Thread Lalwani, Jayesh
Note that Spark never guarantees ordering of columns. There’s nothing in Spark documentation that says that the columns will be ordered a certain way. The proposed solution relies on an implementation detail that might change in future version of Spark. Ideally, you shouldn’t rely on Dataframe

Re: [Structured Streaming] Join stream of readings with collection of previous related readings

2020-11-09 Thread Lalwani, Jayesh
Append mode will wait till watermark expires https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes This is because it append mode doesn't output rows until a row is finalized. Thereotically, data for any row can appear anytime as long as it's in the

Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread Lalwani, Jayesh
Are you getting any output? Streaming jobs typically run forever, and keep processing data as it comes in the input. If a streaming job is working well, it will typically generate output at a certain cadence From: KhajaAsmath Mohammed Date: Tuesday, October 20, 2020 at 1:23 PM To: "user

Re: Count distinct and driver memory

2020-10-19 Thread Lalwani, Jayesh
ffect later use, such collect. the resulting GC can be explained by both caching and collect Lalwani, Jayesh writes: > I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns

Count distinct and driver memory

2020-10-18 Thread Lalwani, Jayesh
I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual

Re: How to Scale Streaming Application to Multiple Workers

2020-10-16 Thread Lalwani, Jayesh
ons would be highly appreciated.. ND On 10/16/20 2:49 PM, Lalwani, Jayesh wrote: With a file based source, Spark is going to take maximum use of memory before it tries to scaling to more nodes. Parallelization adds overhead. This overhead is negligible if your data is several gigs or above. If y

Re: How to Scale Streaming Application to Multiple Workers

2020-10-16 Thread Lalwani, Jayesh
e input. What I am interested is how to have multiple workers to read and process the small files in parallel, and certainly one file per worker at a time. Partitioning data frame doesn't make sense since the data frame is small already. On 10/15/20 9:14 AM, Lalwani, Jayesh wrote: > Parallelism

Re: How to Scale Streaming Application to Multiple Workers

2020-10-15 Thread Lalwani, Jayesh
Parallelism of streaming depends on the input source. If you are getting one small file per microbatch, then Spark will read it in one worker. You can always repartition your data frame after reading it to increase the parallelism. On 10/14/20, 11:26 PM, "Artemis User" wrote: CAUTION:

Re: Multiple applications being spawned

2020-10-13 Thread Lalwani, Jayesh
Where are you running your Spark cluster? Can you post the command line that you are using to run your application? Spark is designed to process a lot of data by distributing work to a cluster of a machines. When you submit a job, it starts executor processes on the cluster. So, what you are

Re: Distribute entire columns to executors

2020-09-24 Thread Lalwani, Jayesh
You could covert columns to rows. Some thing like this val cols = [“A”, “B”, “C”] df.flatMap( row => { cols.map(c => (row.getAsTimeStamp(“timestamp”), row.getAsInt(c), c) ) }).toDF(“timestamp”, “value”, “colName”) If you are using dataframes, all of your columns are of the same type. If

Running Spark on Kubernetes behind a HTTP proxy

2018-07-12 Thread Lalwani, Jayesh
We are trying to run a Spark job on Kubernetes cluster. The Spark job needs to talk to some services external to the Kubernetes cluster through a proxy server. We are setting the proxy by setting the extraJavaOptions like this --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost

Re: Increase no of tasks

2018-06-26 Thread Lalwani, Jayesh
You can use repartition method of Dataframe to change the number of partitions https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.Dataset@repartition(numPartitions:Int):org.apache.spark.sql.Dataset[T] On 6/22/18, 3:04 PM, "pratik4891" wrote: It's default , I

Can we get the partition Index in an UDF

2018-06-25 Thread Lalwani, Jayesh
We are trying to add a column to a Dataframe with some data that is seeded by some random data. We want to be able to control the seed, so multiple runs of the same transformation generate the same output. We also want to generate different random numbers for each partition This is easy to do

Re: Spark 2.3.0 and Custom Sink

2018-06-21 Thread Lalwani, Jayesh
Actually, you can do partition level ingest using ForEachWriter. You just have to add each row to a list in the write method, and write to the data store in the close method I know it’s awkward. I don’t know why Spark doesn’t provide a ForEachPartitionWriter From: Yogesh Mahajan Date:

Re: Does Spark Structured Streaming have a JDBC sink or Do I need to use ForEachWriter?

2018-06-21 Thread Lalwani, Jayesh
Open source Spark Structured Streaming doesn’t have a JDBC sink. You can implement your own ForEachWriter, or you can use my sink from here https://github.com/GaalDornick/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/JdbcSink.scala

Re: spark partitionBy with partitioned column in json output

2018-06-04 Thread Lalwani, Jayesh
Purna, This behavior is by design. If you provide partitionBy, Spark removes the columns from the data From: purna pradeep Date: Monday, June 4, 2018 at 8:00 PM To: "user@spark.apache.org" Subject: spark partitionBy with partitioned column in json output im reading below json in spark

Re: Spark structured streaming generate output path runtime

2018-06-01 Thread Lalwani, Jayesh
This will not work the way you have implemented it. The code that you have here will be called only once before the streaming query is started. Once the streaming query starts, this code is not called What I would do is 1. Implement a udf that calculates flourtimestamp 2. Add a column in

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Lalwani, Jayesh
Few things 1. Append mode is going to output data that falls out of the watermark 2. Structured streaming isn’t time based. It reacts only when it sees input data. If no data appears in the input it will not move the aggregation window 3. Clock time is irrelevant to structured

Re: Data is not getting written in sorted format on target oracle table through SPARK

2018-05-30 Thread Lalwani, Jayesh
No. There is no way to control the order except for the option that you have already tried (repartition =1). When you are inserting in parallel from multiple nodes, then the order of inserts cannot be guaranteed. That is because of the very nature of doing things in parallel. The only way order

Re: Structured Streaming, Reading and Updating a variable

2018-05-15 Thread Lalwani, Jayesh
Do you have a code sample, and detailed error message/exception to show? From: Martin Engen Date: Tuesday, May 15, 2018 at 9:24 AM To: "user@spark.apache.org" Subject: Structured Streaming, Reading and Updating a variable Hello, I'm working

Re: Spark 2.3.0 --files vs. addFile()

2018-05-10 Thread Lalwani, Jayesh
This is a long standing bug in Spark. –jars and –files doesn’t work in Standalone mode https://issues.apache.org/jira/browse/SPARK-4160 From: Marius Date: Wednesday, May 9, 2018 at 3:51 AM To: "user@spark.apache.org" Subject: Spark 2.3.0 --files vs.

Re: smarter way to "forget" DataFrame definition and stick to its values

2018-05-02 Thread Lalwani, Jayesh
There is a trade off involved here. If you have a Spark application with a complicated logical graph, you can either cache data at certain points in the DAG, or you don’t cache data. The side effect of caching data is higher memory usage. The side effect of not caching data is higher CPU usage

Re: Dataframe vs dataset

2018-05-01 Thread Lalwani, Jayesh
Neither. All women are humans. Not all humans are women. You wouldn’t say that a woman is a subset of a human. All DataFrames are DataSets. Not all Datasets are DataFrames. The “subset” relationship doesn’t apply here. A DataFrame is a specialized type of DataSet From: Michael Artz

Re: Filter one dataset based on values from another

2018-05-01 Thread Lalwani, Jayesh
What columns do you want to filter myDataSet on? What are the corresponding columns in paramsDataSet? You can easily do what you want using a inner join. For example, if tempview and paramsview both have a column, say employeeID. You can do this with the SQl sparkSession.sql("Select * from

Re: [Spark 2.x Core] .collect() size limit

2018-04-30 Thread Lalwani, Jayesh
Although there is such a thing as virtualization of memory done at the OS layer, JVM imposes it’s own limit that is controlled by the spark.executor.memory and spark.driver.memory configurations. The amount of memory allocated by JVM will be controlled by those parameters. General guidelines

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Lalwani, Jayesh
You could have a really large window. From: Aakash Basu <aakash.spark@gmail.com> Date: Monday, April 16, 2018 at 10:56 AM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis <

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Lalwani, Jayesh
ark@gmail.com> Date: Monday, April 16, 2018 at 4:52 AM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis <panga...@gmail.com>, user <user@spark.apache.org> Subject: Re: [Structured St

Re: Spark parse fixed length file [Java]

2018-04-15 Thread Lalwani, Jayesh
Is your input data partitioned? How much memory have you assigned to your executor? Have you looked at how much time is being spent in GC in the executor? Is Spark spilling the data into disk? It is likely that the partition is too big. Spark tries to read the whole partition into the memory

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-15 Thread Lalwani, Jayesh
Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported From: spark receiver Date:

Re: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when compared to Drill or Presto

2018-03-29 Thread Lalwani, Jayesh
will need to make a call whether you want to take the upfront cost of a shuffle, or you want to live with large number of tasks From: Tin Vu <tvu...@ucr.edu> Date: Thursday, March 29, 2018 at 10:47 AM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: "u

Re: Why doesn't spark use broadcast join?

2018-03-29 Thread Lalwani, Jayesh
Try putting a Broadcast hint like show here https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-hint-framework.html#sql-hints From: Vitaliy Pisarev Date: Thursday, March 29, 2018 at 8:42 AM To: "user@spark.apache.org" Subject:

Re: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when compared to Drill or Presto

2018-03-29 Thread Lalwani, Jayesh
Without knowing too many details, I can only guess. It could be that Spark is creating a lot of tasks even though there are less records. Creation and distribution of tasks has a noticeable overhead on smaller datasets. You might want to look at the driver logs, or the Spark Application Detail

[Structured Streaming] Kafka Sink in Spark 2.3

2018-03-29 Thread Lalwani, Jayesh
Hi I have a custom streaming sink that internally uses org.apache.spark.sql.kafka010.KafkaSink. This was working in 2.2.. When I upgraded to 2.3, I get this exception. Does spark-sql-Kafka010 work on Spark 2.3? 84705281f4b]] DEBUG com.capitalone.sdp.spark.source.SdpSink - Writing batch to

Incompatibility in LZ4 dependencies

2018-03-08 Thread Lalwani, Jayesh
There is an incompatibility in LZ4 dependencies being imported in spark 2.3.0 org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 imports org.apache.kafka:kafka-clients:0.11.0.0, which imports net.jpountz.lz4:lz4:1.3.0 OTOH, org.apache.spark:spark-core_2.11:2.3.0 imports org.lz4:lz4-java:1.4.0

Question on Spark-kubernetes integration

2018-03-02 Thread Lalwani, Jayesh
Does the Resource scheduler support dynamic resource allocation? Are there any plans to add in the future? The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be

Unclosed NingWSCLient holds up a Spark appication

2018-01-02 Thread Lalwani, Jayesh
I noticed some weird behavior with NingWSClient 2.4.3. when used with Spark. Try this 1. Spin up spark-shell with play-ws2.4.3 in driver class path 2. Run this code val myConfig = new AsyncHttpClientConfigBean() config.setAcceptAnyCertificate(true) config.setFollowRedirect(true) val

Re: Converting binary files

2018-01-02 Thread Lalwani, Jayesh
You can repartition your dataframe into 1 partition and all the data will land into one partition. However, doing this is perilious because you will end up with all your data on one node, and if you have too much data you will run out of memory. In fact, anytime you are thinking about putting

Re: Subqueries

2017-12-30 Thread Lalwani, Jayesh
: Nicholas Hakobian <nicholas.hakob...@rallyhealth.com> Date: Friday, December 29, 2017 at 8:10 PM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: "user@spark.apache.org" <user@spark.apache.org> Subject: Re: Subqueries This sounds like a perfect example

Re: [Structured Streaming] Reuse computation result

2017-12-29 Thread Lalwani, Jayesh
There is no way to solve this within spark. One option you could do is break up your application into multiple application. First application can filter and write the filtered results into a kafka queue. Second application can read from queue and sum. Third application can read from queue and

Subqueries

2017-12-29 Thread Lalwani, Jayesh
I have a table, and I want to find the latest records in the table. The table has a column called instnc_id that is incremented everyday. So, I want to find the records that have the max instnc_id. I am trying to do this using subqueries, but it gives me an error. For example, when I try this

Union of streaming dataframes

2017-11-17 Thread Lalwani, Jayesh
Is union of 2 Structured streaming dataframes from different sources supported in 2.2? We have done a union of 2 streaming dataframes that are from the same source. I wanted to know if multiple streams are supported or going to be supported in the future

Checksum error during checkpointing

2017-10-12 Thread Lalwani, Jayesh
We have a Structured Streaming application running for more than 40 hours. We are storing checkpoints in EFS. The application is failing with a checksum error on the checkpoint (stack trace below) Is this because the checkpoint is corrupt? Is there a way to fix this? Is this a bug in Spark?

mapPartitioningWithIndex in Dataframe

2017-08-03 Thread Lalwani, Jayesh
Are there any plans to add mapPartitioningWithIndex in the Dataframe API? Or is there any way to implement my own mapPartitionWithIndex for a Dataframe? I am implementing something which is logically similar to the randomSplit function. In 2.1, randomSplit internally does

Supporting columns with heterogenous data

2017-07-21 Thread Lalwani, Jayesh
What is a good way to support non-homogenous input data? In structured streaming Let me explain the use case that we are trying to solve. We are reading data from 3 topics in Kafka. All the topics have data in Avro format, with each of them having their own schema. Now, all the 3 Avro schemas

Re: Union of 2 streaming data frames

2017-07-10 Thread Lalwani, Jayesh
Michael, I see that 2.2 RC6 has passed a vote on Friday. Does this mean 2.2 is going to be out soon? Do you have some sort of ETA? From: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Date: Friday, July 7, 2017 at 5:46 PM To: Michael Armbrust <mich...@databr

Re: Union of 2 streaming data frames

2017-07-07 Thread Lalwani, Jayesh
Great! Even, val dfAllEvents = sparkSession.table("oldEvents").union(sparkSession.table("newEvents")) doesn’t work. Will this be addressed in 2.2? From: Michael Armbrust <mich...@databricks.com> Date: Friday, July 7, 2017 at 5:42 PM To: "Lalwani, Jayesh"

Re: Union of 2 streaming data frames

2017-07-07 Thread Lalwani, Jayesh
rust <mich...@databricks.com> Date: Friday, July 7, 2017 at 2:30 PM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: "user@spark.apache.org" <user@spark.apache.org> Subject: Re: Union of 2 streaming data frames df.union(df2) should be supported when both

Union of 2 streaming data frames

2017-07-07 Thread Lalwani, Jayesh
In structured streaming, Is there a way to Union 2 streaming data frames? Are there any plans to support Union of 2 streaming dataframes soon? I can understand the inherent complexity in joining 2 streaming data frames. But, Union is just concatenating 2 microbatches, innit? The problem that

Re: Refreshing a persisted RDD

2017-05-03 Thread Lalwani, Jayesh
operations instead of using Spark SQL to join the dataframes? From: Tathagata Das <tathagata.das1...@gmail.com> Date: Wednesday, May 3, 2017 at 6:32 PM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: user <user@spark.apache.org> Subject: Re: Refreshing a persisted RD