S3 committer for dynamic partitioning

2024-03-05 Thread Nikhil Goyal
Hi folks, We have been following this doc for writing data from Spark Job to S3. However it fails writing to dynamic partitions. Any suggestions on what config should be used to avoid the cost of renaming in S3?

Re: Architecture of Spark Connect

2023-12-14 Thread Nikhil Goyal
If multiple applications are running, we would need multiple spark connect servers? If so, is the user responsible for creating these servers or they are just created on the fly when the user requests a new spark session? On Thu, Dec 14, 2023 at 10:28 AM Nikhil Goyal wrote: > Hi folks, >

Architecture of Spark Connect

2023-12-14 Thread Nikhil Goyal
Hi folks, I am trying to understand one question. Does Spark Connect create a new driver in the backend for every user or there are a fixed number of drivers running to which requests are sent to? Thanks Nikhil

Shuffle data on pods which get decomissioned

2023-06-20 Thread Nikhil Goyal
Hi folks, When running Spark on K8s, what would happen to shuffle data if an executor is terminated or lost. Since there is no shuffle service, does all the work done by that executor gets recomputed? Thanks Nikhil

Viewing UI for spark jobs running on K8s

2023-05-31 Thread Nikhil Goyal
Hi folks, Is there an equivalent of the Yarn RM page for Spark on Kubernetes. We can port-forward the UI from the driver pod for each but this process is tedious given we have multiple jobs running. Is there a clever solution to exposing all Driver UIs in a centralized place? Thanks Nikhil

Re: Partition by on dataframe causing a Sort

2023-04-20 Thread Nikhil Goyal
Is it possible to use MultipleOutputs and define a custom OutputFormat and then use `saveAsHadoopFile` to be able to achieve this? On Thu, Apr 20, 2023 at 1:29 PM Nikhil Goyal wrote: > Hi folks, > > We are writing a dataframe and doing a partitionby() on it. > df.write.part

Partition by on dataframe causing a Sort

2023-04-20 Thread Nikhil Goyal
Hi folks, We are writing a dataframe and doing a partitionby() on it. df.write.partitionBy('col').parquet('output') Job is running super slow because internally per partition it is doing a sort before starting to output to the final location. This sort isn't useful in any way since # of files

Understanding executor memory behavior

2023-03-16 Thread Nikhil Goyal
Hi folks, I am trying to understand what would be the difference in running 8G 1 core executor vs 40G 5 core executors. I see that on yarn it can cause bin fitting issues but other than that are there any pros and cons on using either? Thanks Nikhil

Increasing Spark history resources

2022-12-08 Thread Nikhil Goyal
Hi folks, We are experiencing slowness in Spark history server, hence trying to find what config properties we can tune to fix the issue. I found that SPARK_DAEMON_MEMORY is used to control memory, similarly is there a config property to increase the number of threads? Thanks Nikhil

Driver takes long time to finish once job ends

2022-11-22 Thread Nikhil Goyal
Hi folks, We are running a job on our on prem cluster on K8s but writing the output to S3. We noticed that all the executors finish in < 1h but the driver takes another 5h to finish. Logs: 22/11/22 02:08:29 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 10.42.145.11:39001 in memory (size:

Dynamic allocation on K8

2022-10-25 Thread Nikhil Goyal
Hi folks, When running spark on Kubernetes is it possible to use dynamic allocation? Some blog posts mentioned that dynamic allocation is available, however I am not sure how it works. Spark official docs

partitionBy creating lot of small files

2022-06-04 Thread Nikhil Goyal
Hi all, Is there a way to use dataframe.partitionBy("col") and control the number of output files without doing a full repartition? The thing is some partitions have more data while some have less. Doing a .repartition is a costly operation. We want to control the size of the output files. Is it

Re: PartitionBy and SortWithinPartitions

2022-06-03 Thread Nikhil Goyal
gt; > Enrico > > > Am 03.06.22 um 16:13 schrieb Nikhil Goyal: > > Hi folks, > > We are trying to do > ` > df.coalesce(1000).sortWithinPartitions("col1").write.mode('overwrite').partitionBy("col2").parquet(...) > ` > > I do see that coales

PartitionBy and SortWithinPartitions

2022-06-03 Thread Nikhil Goyal
Hi folks, We are trying to do ` df.coalesce(1000).sortWithinPartitions("col1").write.mode('overwrite').partitionBy("col2").parquet(...) ` I do see that coalesce 1000 is applied for every sub partition. But I wanted to know if sortWithinPartitions(col1) works after applying partitionBy or before?

Serialization error when using scala kernel with Jupyter

2020-02-21 Thread Nikhil Goyal
Hi all, I am trying to use almond scala kernel to run spark session on Jupyter. I am using scala version 2.12.8. I am creating spark session with master set to Yarn. This is the code: val rdd = spark.sparkContext.parallelize(Seq(1, 2, 4)) rdd.map(x => x + 1).collect() Exception:

Understanding deploy mode config

2019-10-02 Thread Nikhil Goyal
Hi all, In a pyspark application is the python process the driver or spark will start a new driver process? If it is the same as driver then how does specifying "spark.submit.deployMode" as "cluster" in spark conf would come in use. conf = SparkConf() .setMaster("yarn")

Need help with SparkSQL Query

2018-12-17 Thread Nikhil Goyal
Hi guys, I have a dataframe of type Record (id: Long, timestamp: Long, isValid: Boolean, other metrics) Schema looks like this: root |-- id: long (nullable = true) |-- timestamp: long (nullable = true) |-- isValid: boolean (nullable = true) . I need to find the earliest valid record

Re: Writing to vertica from spark

2018-10-22 Thread Nikhil Goyal
Fixed this by setting fileformat -> "parquet" On Mon, Oct 22, 2018 at 11:48 AM Nikhil Goyal wrote: > Hi guys, > > My code is failing with this error > > java.lang.Exception: S2V: FATAL ERROR for job S2V_job9197956021769393773. > Job status information is

Writing to vertica from spark

2018-10-22 Thread Nikhil Goyal
Hi guys, My code is failing with this error java.lang.Exception: S2V: FATAL ERROR for job S2V_job9197956021769393773. Job status information is available in the Vertica table test.S2V_JOB_STATUS_USER_NGOYAL. Unable to save intermediate orc files to HDFS

Re: [External Sender] Writing dataframe to vertica

2018-10-22 Thread Nikhil Goyal
t 16, 2018 at 7:24 PM Nikhil Goyal wrote: > >> Hi guys, >> >> I am trying to write dataframe to vertica using spark. It seems like >> spark is creating a temp table under public schema. I don't have access to >> public schema hence the job is failing. Is there a

Writing dataframe to vertica

2018-10-16 Thread Nikhil Goyal
Hi guys, I am trying to write dataframe to vertica using spark. It seems like spark is creating a temp table under public schema. I don't have access to public schema hence the job is failing. Is there a way to specify another schema? Error ERROR s2v.S2VUtils: createJobStatusTable: FAILED to

Driver OOM when using writing parquet

2018-08-06 Thread Nikhil Goyal
Hi guys, I have a simple job which reads LZO thrift files and writes them in Parquet Format. Driver is going out of memory. Parquet writer does keep some meta info in memory but that should cause the executor to go out of memory. No computation is being done on the driver. Any idea what could be

Zstd codec for writing dataframes

2018-06-18 Thread Nikhil Goyal
Hi guys, I was wondering if there is a way to compress files using zstd. It seems zstd compression can be used for shuffle data, is there a way to use it for output data as well? Thanks Nikhil

Re: Class cast exception while using Data Frames

2018-03-27 Thread Nikhil Goyal
.0) (keyTuple, sum / count.toDouble) }.toMap }) instDF.withColumn("customMap", avgMapValueUDF(col("metricMap"), lit(1))).show On Mon, Mar 26, 2018 at 11:51 PM, Shmuel Blitz <shmuel.bl...@similarweb.com> wrote: > Hi Nikhil,

Re: Class cast exception while using Data Frames

2018-03-26 Thread Nikhil Goyal
uth...@dataroots.io> wrote: > Can you give the output of “printSchema” ? > > > On 26 Mar 2018, at 22:39, Nikhil Goyal <nownik...@gmail.com> wrote: > > Hi guys, > > I have a Map[(String, String), Double] as one of my columns. Using > > input.getAs[Map[(String, String),

Class cast exception while using Data Frames

2018-03-26 Thread Nikhil Goyal
Hi guys, I have a Map[(String, String), Double] as one of my columns. Using input.getAs[Map[(String, String), Double]](0) throws exception: Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 Even the schema

Using Thrift with Dataframe

2018-02-28 Thread Nikhil Goyal
Hi guys, I have a RDD of thrift struct. I want to convert it into a dataframe. Can someone suggest how I can do this? Thanks Nikhil

Re: Job never finishing

2018-02-21 Thread Nikhil Goyal
ify-and-re-schedule-slow-running-tasks/ > > Sent from my iPhone > > On Feb 20, 2018, at 5:52 PM, Nikhil Goyal <nownik...@gmail.com> wrote: > > Hi guys, > > I have a job which gets stuck if a couple of tasks get killed due to OOM > exception. Spark doesn't kill the job an

Job never finishing

2018-02-20 Thread Nikhil Goyal
Hi guys, I have a job which gets stuck if a couple of tasks get killed due to OOM exception. Spark doesn't kill the job and it keeps on running for hours. Ideally I would expect Spark to kill the job or restart the killed executors but nothing seems to be happening. Anybody got idea about this?

GC issues with spark job

2018-02-18 Thread Nikhil Goyal
Hi, I have a job which is spending approx 30% time in GC. When I looked at the logs it seems like GC is triggering before the spill happens. I wanted to know if there is a config setting which I can use to force spark to spill early, maybe when memory is 60-70% full. Thanks Nikhil

Question about DStreamCheckpointData

2017-01-25 Thread Nikhil Goyal
Hi, I am using DStreamCheckpointData and it seems that spark checkpoints data even if the rdd processing fails. It seems to checkpoint at the moment it creates the rdd rather than waiting till its completion. Anybody knows how to make it wait till completion? Thanks Nikhil

Streaming and Batch code sharing

2016-06-25 Thread Nikhil Goyal
Hi, Does anyone has a good example where realtime and batch are able to share same code. (Other than this one https://github.com/databricks/reference-apps/blob/master/logs_analyzer/chapter1/reuse.md ) Thanks Nikhil

Re: Protobuf class not found exception

2016-05-31 Thread Nikhil Goyal
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-find-proto-buffer-class-error-with-RDD-lt-protobuf-gt-td14529.html But has this been solved? On Tue, May 31, 2016 at 3:26 PM, Nikhil Goyal <nownik...@gmail.com> wrote: > I am getting this error when I am trying to c

Protobuf class not found exception

2016-05-31 Thread Nikhil Goyal
I am getting this error when I am trying to create rdd of (protokey, value). When I change this to (*protokey.toString*, value) it works fine. *This is the stack trace:* java.lang.RuntimeException: Unable to find proto buffer class at

Re: Timed aggregation in Spark

2016-05-23 Thread Nikhil Goyal
in state. On Mon, May 23, 2016 at 1:33 PM, Ofir Kerker <ofir.ker...@gmail.com> wrote: > Yes, check out mapWithState: > > https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html > > _____ > Fr

Timed aggregation in Spark

2016-05-23 Thread Nikhil Goyal
Hi all, I want to aggregate my data for 5-10 min and then flush the aggregated data to some database like vertica. updateStateByKey is not exactly helpful in this scenario as I can't flush all the records at once, neither can I clear the state. I wanted to know if anyone else has faced a similar