Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Raghavendra Ganesh
Hi,
What is the purpose for which you want to use repartition() .. to reduce
the number of files in delta?
Also note that there is an alternative option of using coalesce() instead
of repartition().
--
Raghavendra


On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
> ).alias("msg")
> )
> .withColumn("uuid", col("msg.uuid"))
> # etc other columns...
>
> # generated column for daily partitioning in Delta Lake
> .withColumn(CREATED_DATE,
> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> .drop("msg")
> )
>
> query = (
> table
> .repartition(10).writeStream
> .queryName(APP_NAME)
> .outputMode("append")
> .format("delta")
> .partitionBy(CREATED_DATE)
> .option("checkpointLocation", os.environ["CHECKPOINT"])
> .start(os.environ["DELTA_PATH"])
> )
>
> query.awaitTermination()
> spark.stop()
> ```
>
> Any advice would be appreciated.
>
> --
> Best Regards,
> Shao Yang HONG
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Incremental Value dependents on another column of Data frame Spark

2023-05-23 Thread Raghavendra Ganesh
Given, you are already stating the above can be imagined as a partition, I
can think of mapPartitions iterator.

  val inputSchema = inputDf.schema
  val outputRdd = inputDf.rdd.mapPartitions(rows => new SomeClass(rows))
  val outputDf = sparkSession.createDataFrame(outputRdd,
inputSchema.add("counter", IntegerType))
}

class SomeClass(rows: Iterator[Row]) extends Iterator[Row] {
  var counter: Int = 0
  override def hasNext: Boolean = rows.hasNext

  override def next(): Row = {
val row = rows.next()
val rowType:String = row.getAs[String]("Type")
if(rowType == "M01")
  counter = 0
else
  counter += 1
Row.fromSeq(row.toSeq ++ Seq(counter))
  }
}

--
Raghavendra


On Tue, May 23, 2023 at 11:44 PM Nipuna Shantha 
wrote:

> Hi all,
>
> This is the sample set of data that I used for this task
>
> [image: image.png]
>
> My expected output is as below
>
> [image: image.png]
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02 it
> should be incremented from 1 or 0 until the sequence of M02 is finished.
> Imagine this as a partition so row numbers cannot jumble. So can you guys
> suggest a method to this scenario. Also for your concern this dataset is
> really large; it has around 1 records and I am using spark with
> scala
>
> Thank You,
> Best Regards
>
>
> 
> Virus-free.www.avast.com
> 
> <#m_4627475067266622656_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>


Re: Spark Aggregator with ARRAY input and ARRAY output

2023-04-23 Thread Raghavendra Ganesh
For simple array types setting encoder to ExpressionEncoder() should work.
--
Raghavendra


On Sun, Apr 23, 2023 at 9:20 PM Thomas Wang  wrote:

> Hi Spark Community,
>
> I'm trying to implement a custom Spark Aggregator (a subclass to
> org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong,
> but I'm assuming I will be able to use it as an aggregation function like
> SUM.
>
> What I'm trying to do is that I have a column of ARRAY and I
> would like to GROUP BY another column and perform element-wise SUM if the
> boolean flag is set to True. The result of such aggregation should return
> ARRAY.
>
> Here is my implementation so far:
>
> package mypackage.udf;
>
> import org.apache.spark.sql.Encoder;
> import org.apache.spark.sql.expressions.Aggregator;
>
> import java.util.ArrayList;
> import java.util.List;
>
> public class ElementWiseAgg extends Aggregator, List, 
> List> {
>
> @Override
> public List zero() {
> return new ArrayList<>();
> }
>
> @Override
> public List reduce(List b, List a) {
> if (a == null) return b;
> int diff = a.size() - b.size();
> for (int i = 0; i < diff; i++) {
> b.add(0L);
> }
> for (int i = 0; i < a.size(); i++) {
> if (a.get(i)) b.set(i, b.get(i) + 1);
> }
> return b;
> }
>
> @Override
> public List merge(List b1, List b2) {
> List longer;
> List shorter;
> if (b1.size() > b2.size()) {
> longer = b1;
> shorter = b2;
> } else {
> longer = b2;
> shorter = b1;
> }
> for (int i = 0; i < shorter.size(); i++) {
> longer.set(i, longer.get(i) + shorter.get(i));
> }
> return longer;
> }
>
> @Override
> public List finish(List reduction) {
> return reduction;
> }
>
> @Override
> public Encoder> bufferEncoder() {
> return null;
> }
>
> @Override
> public Encoder> outputEncoder() {
> return null;
> }
> }
>
> The part I'm not quite sure is how to override bufferEncoder and
> outputEncoder. The default Encoders list does not provide encoding for
> List.
>
> Can someone point me to the right direction? Thanks!
>
>
> Thomas
>
>
>


Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Raghavendra Ganesh
you can groupBy(country). and use mapPartitions method in which you can
iterate over all rows keeping 2 variables for maxPopulationSoFar and
corresponding city. Then return the city with max population.
I think as others suggested, it may be possible to use Bucketing, it would
give a more friendly SQL'ish way of doing and but not be the best in
performance as it needs to order/sort.
--
Raghavendra


On Mon, Dec 19, 2022 at 8:57 PM Oliver Ruebenacker <
oliv...@broadinstitute.org> wrote:

>
>  Hello,
>
>   How can I retain from each group only the row for which one value is the
> maximum of the group? For example, imagine a DataFrame containing all major
> cities in the world, with three columns: (1) City name (2) Country (3)
> population. How would I get a DataFrame that only contains the largest city
> in each country? Thanks!
>
>  Best, Oliver
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: how to add a column for percent

2022-05-23 Thread Raghavendra Ganesh
withColumn takes a column as the second argument, not string.
If you want formatting before show() you can use the round() function.
--
Raghavendra


On Mon, May 23, 2022 at 11:35 AM wilson  wrote:

> hello
>
> how to add a column for percent for the current row of counted data?
>
> scala>
> df2.groupBy("_c1").count.withColumn("percent",f"${col(count)/df2.count}%.2f").show
>
> :30: error: type mismatch;
>
>
> This doesn't work.
>
> so please help. thanks.
>
>
>
>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Raghavendra Ganesh
What is optimal depends on the context of the problem.
Is the intent here to find the best solution for top n values with a group
by ?

Both the solutions look sub-optimal to me. Window function would be
expensive as it needs an order by (which a top n solution shouldn't need).
It would be best to just group by department and use an aggregate function
which stores the top n values in a heap.
--
Raghavendra


On Mon, Feb 28, 2022 at 12:01 AM Sid  wrote:

> My bad.
>
> Aggregation Query:
>
> # Write your MySQL query statement below
>
>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
> ORDER by E.DepartmentId, E.Salary DESC
>
> Time Taken: 1212 ms
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time Taken: 790 ms
>
> Thanks,
> Sid
>
>
> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>
>> Those two queries are identical?
>>
>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am aware that if windowing functions are used, then at first it loads
>>> the entire dataset into one window,scans and then performs the other
>>> mentioned operations for that particular window which could be slower when
>>> dealing with trillions / billions of records.
>>>
>>> I did a POC where I used an example to find the max 3 highest salary for
>>> an employee per department. So, I wrote a below queries and compared the
>>> time for it:
>>>
>>> Windowing Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 790 ms
>>>
>>> Aggregation Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 1212 ms
>>>
>>> But as per my understanding, the aggregation should have run faster. So,
>>> my whole point is if the dataset is huge I should force some kind of map
>>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>>
>>> So I think the aggregation query is taking more time since the dataset
>>> size here is smaller and as we all know that map reduce works faster when
>>> there is a huge volume of data. Haven't tested it yet on big data but
>>> needed some expert guidance over here.
>>>
>>> Please correct me if I am wrong.
>>>
>>> TIA,
>>> Sid
>>>
>>>
>>>
>>>


Re: how to classify column

2022-02-11 Thread Raghavendra Ganesh
You could use expr() function to achieve the same.

.withColumn("newColumn",expr(s"case when score>3 then 'good' else 'bad'
end"))
--
Raghavendra


On Fri, Feb 11, 2022 at 5:59 PM frakass  wrote:

> Hello
>
> I have a column whose value (Int type as score) is from 0 to 5.
> I want to query that, when the score > 3, classified as "good". else
> classified as "bad".
> How do I implement that? A UDF like something as this?
>
> scala> implicit class Foo(i:Int) {
>   |   def classAs(f:Int=>String) = f(i)
>   | }
> class Foo
>
> scala> 4.classAs { x => if (x > 3) "good" else "bad" }
> val res13: String = good
>
> scala> 2.classAs { x => if (x > 3) "good" else "bad" }
> val res14: String = bad
>
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Merge two dataframes

2021-05-12 Thread Raghavendra Ganesh
You can add an extra id column and perform an inner join.

val df1_with_id = df1.withColumn("id", monotonically_increasing_id())

val df2_with_id = df2.withColumn("id", monotonically_increasing_id())

df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()

+-+-+

|amount_6m|amount_9m|

+-+-+

|  100|  500|

|  200|  600|

|  300|  700|

|  400|  800|

|  500|  900|

+-+-+


--
Raghavendra


On Wed, May 12, 2021 at 6:20 PM kushagra deep 
wrote:

> Hi All,
>
> I have two dataframes
>
> df1
>
> amount_6m
>  100
>  200
>  300
>  400
>  500
>
> And a second data df2 below
>
>  amount_9m
>   500
>   600
>   700
>   800
>   900
>
> The number of rows is same in both dataframes.
>
> Can I merge the two dataframes to achieve below df
>
> df3
>
> amount_6m | amount_9m
> 100   500
>  200  600
>  300  700
>  400  800
>  500  900
>
> Thanks in advance
>
> Reg,
> Kushagra Deep
>
>


Re: How to Spawn Child Thread or Sub-jobs in a Spark Session

2020-12-04 Thread Raghavendra Ganesh
There should not be any need to explicitly make DF-2, DF-3 computation
parallel. Spark generates execution plans and it can decide what can run in
parallel (ideally you should see them running parallel in spark UI).

You need to cache DF-1 if possible (either in memory/disk), otherwise
computation of DF-2 and DF-3 might trigger the DF-1 computation in
duplicate.

--
Raghavendra


On Sat, Dec 5, 2020 at 12:31 AM Artemis User  wrote:

> We have a Spark job that produces a result data frame, say DF-1 at the
> end of the pipeline (i.e. Proc-1).  From DF-1, we need to create two or
> more dataf rames, say DF-2 and DF-3 via additional SQL or ML processes,
> i.e. Proc-2 and Proc-3.  Ideally, we would like to perform Proc-2 and
> Proc-3 in parallel, since Proc-2 and Proc-3 can be executed
> independently, with DF-1 made immutable and DF-2 and DF-3 are
> mutual-exclusive.
>
> Does Spark has some built-in APIs to support spawning sub-jobs in a
> single session?  If multi-threading is needed, what are the common best
> practices in this case?
>
> Thanks in advance for your help!
>
> -- ND
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Count distinct and driver memory

2020-10-19 Thread Raghavendra Ganesh
Spark provides multiple options for caching (including disk). Have you
tried caching to disk ?
--
Raghavendra


On Mon, Oct 19, 2020 at 11:41 PM Lalwani, Jayesh
 wrote:

> I was caching it because I didn't want to re-execute the DAG when I ran
> the count query. If you have a spark application with multiple actions,
> Spark reexecutes the entire DAG for each action unless there is a cache in
> between. I was trying to avoid reloading 1/2 a terabyte of data.  Also,
> cache should use up executor memory, not driver memory.
>
> As it turns out cache was the problem. I didn't expect cache to take
> Executor memory and spill over to disk. I don't know why it's taking driver
> memory. The input data has millions of partitions which results in millions
> of tasks. Perhaps the high memory usage is a side effect of caching the
> results of lots of tasks.
>
> On 10/19/20, 1:27 PM, "Nicolas Paris"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> > Before I write the data frame to parquet, I do df.cache. After
> writing
> > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> if you write the df to parquet, why would you also cache it ? caching
> by
> default loads the memory. this might affect  later use, such
> collect. the resulting GC can be explained by both caching and collect
>
>
> Lalwani, Jayesh  writes:
>
> > I have a Dataframe with around 6 billion rows, and about 20 columns.
> First of all, I want to write this dataframe out to parquet. The, Out of
> the 20 columns, I have 3 columns of interest, and I want to find how many
> distinct values of the columns are there in the file. I don’t need the
> actual distinct values. I just need the count. I knoe that there are around
> 10-16million distinct values
> >
> > Before I write the data frame to parquet, I do df.cache. After
> writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> >
> > When I run this, I see that the memory usage on my driver steadily
> increases until it starts getting future time outs. I guess it’s spending
> time in GC. Does countDistinct cause this behavior? Does Spark try to get
> all 10 million distinct values into the driver? Is countDistinct not
> recommended for data frames with large number of distinct values?
> >
> > What’s the solution? Should I use approx._count_distinct?
>
>
> --
> nicolas paris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Spark events log behavior in interactive vs batch job

2020-08-01 Thread Sriram Ganesh
Hi,

I am working on writing spark events and application logs in the blob
storage. I am using a similar path for writing spark events and application
logs in blob storage.
For example: *spark.eventLog.dir =
wasb://@/logs* and *application log dir =
wasb://@/logs/app/*.

Since I'm using blob storage I need to create a root directory with a
placeholder file otherwise writing action fails. But I'm not creating a
placeholder file.

Now in case of batch job where spark driver runs in the cluster mode it
works. Because my application logging is taking care of the creation of
folders. Whereas in the case of interactive job which runs in client mode
is failing since I'm not creating a placeholder file.

I would like to understand how spark emits event in case of an interactive
vs batch job. I feel the time of the event emit is causing this issue.

Can someone help to understand this better?

-- 
*Sriram G*
*Tech*


Re: Monitor executor and task memory getting used

2019-10-24 Thread Sriram Ganesh
I was wrong here.

I am using spark standalone cluster and I am not using YARN or MESOS. Is it
possible to track spark execution memory?.

On Mon, Oct 21, 2019 at 5:42 PM Sriram Ganesh  wrote:

> I looked into this. But I found it is possible like this
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala#L229
>
> Line no 230. This is for executors.
>
> Just wanna cross verify is that right?
>
>
>
> On Mon, 21 Oct 2019, 17:24 Alonso Isidoro Roman, 
> wrote:
>
>> Take a look in this thread
>> <https://stackoverflow.com/questions/48768188/spark-execution-memory-monitoring#_=_>
>>
>> El lun., 21 oct. 2019 a las 13:45, Sriram Ganesh ()
>> escribió:
>>
>>> Hi,
>>>
>>> I wanna monitor how much memory executor and task used for a given job.
>>> Is there any direct method available for it which can be used to track this
>>> metric?
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>
>>
>> --
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>>
>

-- 
*Sriram G*
*Tech*


Re: Monitor executor and task memory getting used

2019-10-21 Thread Sriram Ganesh
I looked into this. But I found it is possible like this
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala#L229

Line no 230. This is for executors.

Just wanna cross verify is that right?



On Mon, 21 Oct 2019, 17:24 Alonso Isidoro Roman,  wrote:

> Take a look in this thread
> <https://stackoverflow.com/questions/48768188/spark-execution-memory-monitoring#_=_>
>
> El lun., 21 oct. 2019 a las 13:45, Sriram Ganesh ()
> escribió:
>
>> Hi,
>>
>> I wanna monitor how much memory executor and task used for a given job.
>> Is there any direct method available for it which can be used to track this
>> metric?
>>
>> --
>> *Sriram G*
>> *Tech*
>>
>>
>
> --
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>


Monitor executor and task memory getting used

2019-10-21 Thread Sriram Ganesh
Hi,

I wanna monitor how much memory executor and task used for a given job. Is
there any direct method available for it which can be used to track this
metric?

-- 
*Sriram G*
*Tech*


Re: unsubscribe

2017-02-23 Thread Ganesh

Thank you for cat facts.

"A group of cats is called a clowder"

MEEOW


To unsubscribe please enter your credit card details followed by your pin.

CAT-FACTS



On 24/02/17 00:04, Donam Kim wrote:

catunsub

2017-02-23 20:28 GMT+11:00 Ganesh Krishnan <m...@ganeshkrishnan.com 
<mailto:m...@ganeshkrishnan.com>>:


Thank you for subscribing to "cat facts"

Did you know that a cat's whiskers is used to determine if it can
wiggle through a hole?


To unsubscribe reply with keyword "catunsub"

Thank you

On Feb 23, 2017 8:25 PM, "Donam Kim" <sst...@gmail.com
<mailto:sst...@gmail.com>> wrote:

unsubscribe






Re: unsubscribe

2017-02-23 Thread Ganesh Krishnan
Thank you for subscribing to "cat facts"

Did you know that a cat's whiskers is used to determine if it can wiggle
through a hole?


To unsubscribe reply with keyword "catunsub"

Thank you

On Feb 23, 2017 8:25 PM, "Donam Kim"  wrote:

> unsubscribe
>


Non-linear (curved?) regression line

2017-01-19 Thread Ganesh


Has anyone worked on non-linear/curved regression lines with Apache 
Spark? This seems to be such a trivial issue but I have given up after 
experimenting for nearly two weeks.

The plot line is as below and the raw data in the table at the end.
 I just can't get Spark ML to give decent predictions with 
LinearRegression or any family in  GeneralizedLinearRegression.


I need to predict 'sales per day' given SalesRank. As the chart shows 
its some kind of exponential function: lower the rank ,exponentially 
higher the sales.


Things I have tried:
Polynomial by taking square of features
Changing family for GLR
Changing regression parameters
Sacrificing a goat to the Apache gods.

How do I go about solving this? Do I have to resort to neural networks?




FeaturesLabel
1   4358
5   4283
10  4193
15  4104
20  4017
50  3532
100 2851
150 2302
200 1858
250 1499
500 989
1000553
2000367
3500221
5000139
6000126
7500108
900092
1   83
5   12
75000   5





VectorUDT and ml.Vector

2016-11-07 Thread Ganesh
I am trying to run a SVD on a dataframe and I have used ml TF-IDF which 
has created a dataframe.
Now for Singular Value Decomposition I am trying to use RowMatrix which 
takes in RDD with mllib.Vector so I have to convert this Dataframe with 
what I assumed was ml.Vector


However the conversion

/val convertedTermDocMatrix = 
MLUtils.convertMatrixColumnsFromML(termDocMatrix,"features")/


fails with

java.lang.IllegalArgumentException: requirement failed: Column features 
must be new Matrix type to be converted to old type but got 
org.apache.spark.ml.linalg.VectorUDT



So the question is: How do I perform SVD on a DataFrame? I assume all 
the functionalities of mllib has not be ported to ml.



I tried to convert my entire project to use RDD but computeSVD on 
RowMatrix is throwing up out of Memory errors and anyway I would like to 
stick with DataFrame.


Our text corpus is around 55 Gb of text data.



Ganesh



Re: Spark Job not failing

2016-09-19 Thread sai ganesh
yes.


Regards,
Sai

On Mon, Sep 19, 2016 at 12:29 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> As I understanding you are inserting into RDBMS from Spark and the insert
> is failing on RDBMS due to duplicate primary key but not acknowledged by
> Spark? Is this correct
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 September 2016 at 20:19, tosaigan...@gmail.com <
> tosaigan...@gmail.com> wrote:
>
>>
>> Hi ,
>>
>> I have primary key on sql table iam trying to insert Dataframe into table
>> using insertIntoJDBC.
>>
>> I could see failure instances in logs but still spark job is getting
>> successful. Do you know  how can we handle in code to make it fail?
>>
>>
>>
>> 16/09/19 18:52:51 INFO TaskSetManager: Starting task 0.99 in stage 82.0
>> (TID
>> 5032, 10.0.0.24, partition 0,PROCESS_LOCAL, 11300 bytes)
>> 16/09/19 18:52:52 INFO TaskSetManager: Lost task 0.99 in stage 82.0 (TID
>> 5032) on executor 10.0.0.24: java.sql.BatchUpdateException (Violation of
>> PRIMARY KEY constraint 'pk_unique'. Cannot insert duplicate key in object
>> 'table_name'. The duplicate key value is (2016-09-13 04:00, 2016-09-13
>> 04:15, 5816324).) [duplicate 99]
>> 16/09/19 18:52:52 ERROR TaskSetManager: Task 0 in stage 82.0 failed 100
>> times; aborting job
>> 16/09/19 18:52:52 INFO YarnClusterScheduler: Removed TaskSet 82.0, whose
>> tasks have all completed, from pool
>> 16/09/19 18:52:52 INFO YarnClusterScheduler: Cancelling stage 82
>> 16/09/19 18:52:52 INFO DAGScheduler: ResultStage 82 (insertIntoJDBC at
>> sparkjob.scala:143) failed in 9.440 s
>> 16/09/19 18:52:52 INFO DAGScheduler: Job 19 failed: insertIntoJDBC at
>> sparkjob.scala:143, took 9.449118 s
>> 16/09/19 18:52:52 INFO ApplicationMaster: Final app status: SUCCEEDED,
>> exitCode: 0
>> 16/09/19 18:52:52 INFO SparkContext: Invoking stop() from shutdown hook
>>
>>
>> Regards,
>> Sai
>>
>>
>>
>> -
>> Sai Ganesh
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-Job-not-failing-tp27756.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>