Spark stage stuck

2020-06-02 Thread Manjunath Shetty H
Hi,

I have running multiple jobs in same driver with FAIR scheduling enabled. 
Intermittently one of the Stage gets stuck and not completing even after long 
time.

Each job flow is something like this

  *   Create JDBC RDD to load data from SQL Server
  *   Create temporary table
  *   Query Temp table with specific set of Columns
  *   Persist the DF
  *   Write DF to HDFS in ORC format
  *   ...

As writing the ORC is the first action it shows it stuck at writing ORC. Is 
there any way to debug this problem ? Any pointers will be helpful

Thanks
Manjunath


Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Thanks Dhaval for the suggestion, but in the case i mentioned in previous mail 
still data can be missed as the row number will change.


-
Manjunath

From: Dhaval Patel 
Sent: Monday, May 25, 2020 3:01 PM
To: Manjunath Shetty H 
Subject: Re: Parallelising JDBC reads in spark

If possible, set the watermark before reading data. Read the max of watermark 
column before reading actual data and add that in query to read actual data, 
like watermark <= current_watermark

It may query db twice, however it will make sure you are not missing any records

Regards
Dhaval

On Mon, May 25, 2020 at 3:38 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Thanks Georg for the suggestion, but at this point changing the design is not 
really the option.

Any other pointer would be helpful.


Thanks
Manjunath

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Monday, May 25, 2020 11:52 AM

To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: Mike Artz mailto:michaelea...@gmail.com>>; user 
mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Well you seem to have performance and consistency problems. Using a CDC tool 
fitting for your database you might be able to fix both.
However, streaming the change events of the database log might be a bit more 
complicated. Tools like https://debezium.io/ could be useful - depending on 
your source database.

Best,
Georg

Am Mo., 25. Mai 2020 um 08:16 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Georg,

Thanks for the response, can please elaborate what do mean by change data 
capture ?

Thanks
Manjunath

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Monday, May 25, 2020 11:14 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: Mike Artz mailto:michaelea...@gmail.com>>; user 
mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Why don't you apply proper change data capture?
This will be more complex though.

Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Mike,

Thanks for the response.

Even with that flag set data miss can happen right ?. As the fetch is based on 
the last watermark (maximum timestamp of the row that last batch job fetched ), 
Take a scenario like this with table

a :  1
b :  2
c :  3
d :  4
f  :  6
g :  7
h :  8
e :  5


  *   a,b,c,d,e get picked by 1 task
  *   by the time second task starts, e has been updated, so the row order 
changes
  *   As f moves up, it will completely get missed in the fetch

Thanks
Manjunath


From: Mike Artz mailto:michaelea...@gmail.com>>
Sent: Monday, May 25, 2020 10:50 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Does anything different happened when you set the isolationLevel to do Dirty 
Reads i.e. "READ_UNCOMMITTED"

On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi,

We are writing a ETL pipeline using Spark, that fetch the data from SQL server 
in batch mode (every 15mins). Problem we are facing when we try to 
parallelising single table reads into multiple tasks without missing any data.

We have tried this,


  *   Use `ROW_NUMBER` window function in the SQL query
  *   Then do
  *

DataFrame df =
hiveContext
.read()
.jdbc(
,
query,
"row_num",
1,
,
noOfPartitions,
jdbcOptions);



The problem with this approach is if our tables get updated in between in SQL 
Server while tasks are still running then the `ROW_NUMBER` will change and we 
may miss some records.


Any approach to how to fix this issue ? . Any pointers will be helpful


Note: I am on spark 1.6


Thanks

Manjiunath Shetty


Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Thanks Georg for the suggestion, but at this point changing the design is not 
really the option.

Any other pointer would be helpful.


Thanks
Manjunath

From: Georg Heiler 
Sent: Monday, May 25, 2020 11:52 AM
To: Manjunath Shetty H 
Cc: Mike Artz ; user 
Subject: Re: Parallelising JDBC reads in spark

Well you seem to have performance and consistency problems. Using a CDC tool 
fitting for your database you might be able to fix both.
However, streaming the change events of the database log might be a bit more 
complicated. Tools like https://debezium.io/ could be useful - depending on 
your source database.

Best,
Georg

Am Mo., 25. Mai 2020 um 08:16 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Georg,

Thanks for the response, can please elaborate what do mean by change data 
capture ?

Thanks
Manjunath

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Monday, May 25, 2020 11:14 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: Mike Artz mailto:michaelea...@gmail.com>>; user 
mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Why don't you apply proper change data capture?
This will be more complex though.

Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Mike,

Thanks for the response.

Even with that flag set data miss can happen right ?. As the fetch is based on 
the last watermark (maximum timestamp of the row that last batch job fetched ), 
Take a scenario like this with table

a :  1
b :  2
c :  3
d :  4
f  :  6
g :  7
h :  8
e :  5


  *   a,b,c,d,e get picked by 1 task
  *   by the time second task starts, e has been updated, so the row order 
changes
  *   As f moves up, it will completely get missed in the fetch

Thanks
Manjunath


From: Mike Artz mailto:michaelea...@gmail.com>>
Sent: Monday, May 25, 2020 10:50 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Does anything different happened when you set the isolationLevel to do Dirty 
Reads i.e. "READ_UNCOMMITTED"

On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi,

We are writing a ETL pipeline using Spark, that fetch the data from SQL server 
in batch mode (every 15mins). Problem we are facing when we try to 
parallelising single table reads into multiple tasks without missing any data.

We have tried this,


  *   Use `ROW_NUMBER` window function in the SQL query
  *   Then do
  *

DataFrame df =
hiveContext
.read()
.jdbc(
,
query,
"row_num",
1,
,
noOfPartitions,
jdbcOptions);



The problem with this approach is if our tables get updated in between in SQL 
Server while tasks are still running then the `ROW_NUMBER` will change and we 
may miss some records.


Any approach to how to fix this issue ? . Any pointers will be helpful


Note: I am on spark 1.6


Thanks

Manjiunath Shetty


Re: Parallelising JDBC reads in spark

2020-05-25 Thread Manjunath Shetty H
Hi Georg,

Thanks for the response, can please elaborate what do mean by change data 
capture ?

Thanks
Manjunath

From: Georg Heiler 
Sent: Monday, May 25, 2020 11:14 AM
To: Manjunath Shetty H 
Cc: Mike Artz ; user 
Subject: Re: Parallelising JDBC reads in spark

Why don't you apply proper change data capture?
This will be more complex though.

Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Mike,

Thanks for the response.

Even with that flag set data miss can happen right ?. As the fetch is based on 
the last watermark (maximum timestamp of the row that last batch job fetched ), 
Take a scenario like this with table

a :  1
b :  2
c :  3
d :  4
f  :  6
g :  7
h :  8
e :  5


  *   a,b,c,d,e get picked by 1 task
  *   by the time second task starts, e has been updated, so the row order 
changes
  *   As f moves up, it will completely get missed in the fetch

Thanks
Manjunath


From: Mike Artz mailto:michaelea...@gmail.com>>
Sent: Monday, May 25, 2020 10:50 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Parallelising JDBC reads in spark

Does anything different happened when you set the isolationLevel to do Dirty 
Reads i.e. "READ_UNCOMMITTED"

On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi,

We are writing a ETL pipeline using Spark, that fetch the data from SQL server 
in batch mode (every 15mins). Problem we are facing when we try to 
parallelising single table reads into multiple tasks without missing any data.

We have tried this,


  *   Use `ROW_NUMBER` window function in the SQL query
  *   Then do
  *

DataFrame df =
hiveContext
.read()
.jdbc(
,
query,
"row_num",
1,
,
noOfPartitions,
jdbcOptions);



The problem with this approach is if our tables get updated in between in SQL 
Server while tasks are still running then the `ROW_NUMBER` will change and we 
may miss some records.


Any approach to how to fix this issue ? . Any pointers will be helpful


Note: I am on spark 1.6


Thanks

Manjiunath Shetty


Re: Parallelising JDBC reads in spark

2020-05-24 Thread Manjunath Shetty H
Hi Mike,

Thanks for the response.

Even with that flag set data miss can happen right ?. As the fetch is based on 
the last watermark (maximum timestamp of the row that last batch job fetched ), 
Take a scenario like this with table

a :  1
b :  2
c :  3
d :  4
f  :  6
g :  7
h :  8
e :  5


  *   a,b,c,d,e get picked by 1 task
  *   by the time second task starts, e has been updated, so the row order 
changes
  *   As f moves up, it will completely get missed in the fetch

Thanks
Manjunath


From: Mike Artz 
Sent: Monday, May 25, 2020 10:50 AM
To: Manjunath Shetty H 
Cc: user 
Subject: Re: Parallelising JDBC reads in spark

Does anything different happened when you set the isolationLevel to do Dirty 
Reads i.e. "READ_UNCOMMITTED"

On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi,

We are writing a ETL pipeline using Spark, that fetch the data from SQL server 
in batch mode (every 15mins). Problem we are facing when we try to 
parallelising single table reads into multiple tasks without missing any data.

We have tried this,


  *   Use `ROW_NUMBER` window function in the SQL query
  *   Then do
  *

DataFrame df =
hiveContext
.read()
.jdbc(
,
query,
"row_num",
1,
,
noOfPartitions,
jdbcOptions);



The problem with this approach is if our tables get updated in between in SQL 
Server while tasks are still running then the `ROW_NUMBER` will change and we 
may miss some records.


Any approach to how to fix this issue ? . Any pointers will be helpful


Note: I am on spark 1.6


Thanks

Manjiunath Shetty


Parallelising JDBC reads in spark

2020-05-24 Thread Manjunath Shetty H
Hi,

We are writing a ETL pipeline using Spark, that fetch the data from SQL server 
in batch mode (every 15mins). Problem we are facing when we try to 
parallelising single table reads into multiple tasks without missing any data.

We have tried this,


  *   Use `ROW_NUMBER` window function in the SQL query
  *   Then do
  *

DataFrame df =
hiveContext
.read()
.jdbc(
,
query,
"row_num",
1,
,
noOfPartitions,
jdbcOptions);



The problem with this approach is if our tables get updated in between in SQL 
Server while tasks are still running then the `ROW_NUMBER` will change and we 
may miss some records.


Any approach to how to fix this issue ? . Any pointers will be helpful


Note: I am on spark 1.6


Thanks

Manjiunath Shetty


How to change Dataframe schema

2020-05-16 Thread Manjunath Shetty H
Hi,

I have a dataframe with some columns and data that is fetched from JDBC, as i 
have to maintain the schema consistent in the ORC file i have to apply 
different schema for that dataframe. Column names will be same, but Data or 
Schema may contain some extra columns.

Is there any way i can apply the schema on top the existing Dataframe ?. Schema 
may be just doing the columns reordering in the most of the cases.

i have tried this "

DataFrame dfNew = hc.createDataFrame(df.rdd(), ((StructType) 
DataType.fromJson(schema)));

"

But this will map the columns based on index and it will fail in case of 
columns reordering.

Any pointers will be helpful.

Thanks and Regards
Manjunath Shetty


Spark ORC store written timestamp as column

2020-04-15 Thread Manjunath Shetty H
Hi All,

Is there anyway to store the exact written timestamp in the ORC file through 
spark ?.
Use case something like `current_timestamp()` function in SQL. Generating in 
the program will not be equal to actual write time in ORC/hdfs file.

Any suggestions will be helpful.


Thanks
Manjunath


Spark 1.6 and ORC bucketed queries

2020-04-01 Thread Manjunath Shetty H
Hi,

Is it possible to do ORC bucked queries in Spark 1.6 ?

Folder structure is like this:
/
 bucket1.orc
 bucket2.orc
 bucket3.orc

And the Spark SQL query will be like `select * from  where partition = 
partition1 and bucket = bucket1`, this query should only read `bucket1.orc` 
file.

Is this possible with Spark 1.6, if so please let me know how to achieve that ?


Thanks
Manjunath Shetty


Spark SQL join ORC and non ORC tables in hive

2020-03-24 Thread Manjunath Shetty H
Hi,

i am on spark 1.6. I am getting error if i try to run a hive query in Spark 
that involves joining ORC and non-ORC tables in hive.

Find the error below, any help would be appreciated

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenExchange hashpartitioning(CENTRAL_ITEM_ID#150,200), None
+- ConvertToUnsafe
   +- HiveTableScan 
[CENTRAL_ITEM_ID#150,PHM_ITEM_COST_AMT#154,LAST_CHANGE_TS#155,HISTORICAL_LAST_CHANGE_TS#160],
 MetastoreRelation rxdwh, phm_item_cost, None

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Sort.doExecute(Sort.scala:64)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Window.doExecute(Window.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Filter.doExecute(basicOperators.scala:70)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at 
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:85)
at 
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:82)
at 
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100)
at 
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:82)
at 
org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:82)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: serious problem
at 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
at 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at 

Re: Saving Spark run stats and run watermark

2020-03-18 Thread Manjunath Shetty H
Thanks for suggestion Netanel,

Sorry for less information, I am specifically looking for something inside 
Hadoop ecosystem.


-
Manjunath

From: Netanel Malka 
Sent: Wednesday, March 18, 2020 5:26 PM
To: Manjunath Shetty H 
Subject: Re: Saving Spark run stats and run watermark

You can try to use a RDBMS like postgrsql or mysql.
I would use a regular table.
Spark have an built-in integration for that:
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html


On Wed, Mar 18, 2020, 13:03 Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi All,

Want to save each spark batch run stats (start, end, ID etc) and watermark ( 
Last processed timestamp from external data source).

We have tried Hive JDBC, but it is very slow due MR jobs it will trigger. Cant 
save to normal Hive tables as it will create lots of small files in HDFS.

Please suggest what is the recommended way to do this ? Any pointers will be 
helpful

Thanks and regards
Manjunath


Saving Spark run stats and run watermark

2020-03-18 Thread Manjunath Shetty H
Hi All,

Want to save each spark batch run stats (start, end, ID etc) and watermark ( 
Last processed timestamp from external data source).

We have tried Hive JDBC, but it is very slow due MR jobs it will trigger. Cant 
save to normal Hive tables as it will create lots of small files in HDFS.

Please suggest what is the recommended way to do this ? Any pointers will be 
helpful

Thanks and regards
Manjunath


Re: Optimising multiple hive table join and query in spark

2020-03-16 Thread Manjunath Shetty H
Thanks Georg,

Batch import job frequency is different than the read job. Import job will run 
every 15mins - 1 hour, and Read/Transform job will run once a day.

In this case i think write with sortWithinPartitions doesnt make any difference 
as the combined data stored in HDFS will not be sorted at the end of the day.

Does partition/sort while reading help ?. Tried this out but it still results 
in shuffle during join of multiple tables and generates very complex DAG

-
Manjunath

From: Georg Heiler 
Sent: Monday, March 16, 2020 12:06 PM
To: Manjunath Shetty H 
Subject: Re: Optimising multiple hive table join and query in spark

Hi,

if you only have 1.6, forget bucketing. 
https://databricks.com/session/bucketing-in-spark-sql-2-3 that only works well 
with Hive from 2.3 onwards.
The other thing in your (daily?) batch job

val myData = 
spark.read.<>(/path/to/file).transform(<>)

Now when writing the data:
myData.write.repartition(xxx)
where xxx resembles the number of files you want to have for each period 
(day?). When writing ORC / Parquet make sure to have files of HDFS Block Size 
or more i.e. usually 128MB up to a maximum of 1G.
myData.write.repartition(xxx)).sortWithinPartitions(join_col, join_col)

apply a secondary sort to get ORC Indices.

IF the cardinality of the join_cols is high enough:
myData.write.repartition(xxx, col(join_col), 
col(other_join_col))).sortWithinPartitions(join_col, join_col)

Best,
Georg

Am Mo., 16. März 2020 um 04:27 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Georg,

Thanks for the suggestion. Can you please explain bit more about what you meant 
exactly ?

Bdw i am on Spark 1.6


-
Manjunath

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Monday, March 16, 2020 12:35 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Subject: Re: Optimising multiple hive table join and query in spark

To speed things up:
- sortWithinPartitions (i.e. for each day)& potentially repartition
- pre-shuffle the data with bucketing

Am So., 15. März 2020 um 17:07 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Only partitioned and Join keys are not sorted coz those are written 
incrementally with batch jobs

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Sunday, March 15, 2020 8:30:53 PM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: ayan guha mailto:guha.a...@gmail.com>>; Magnus Nilsson 
mailto:ma...@kth.se>>; user 
mailto:user@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Did you only partition or also bucket by the join column? Are ORCI indices 
active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned 
by same column. During join it does reshuffle, that is the bottleneck as of now 
in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at 
the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha mailto:guha.a...@gmail.com>>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson mailto:ma...@kth.se>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during 
the query. One possibility is it just takes ramp up time for executors to be 
available, if thats the case then maybe a dedicated yarn queue may help, or 
using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson 
mailto:ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a 
join condition to trick catalyst into not reshuffling the partitions, ie use 
regular equality on the column you partitioned or bucketed by and your custom 
comparer for the other columns. Never got around to try it out hough. I really 
would like a native way to tell catalyst not to reshuffle just because you use 
more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
are serving a usecase on top of that by joining 4-5 tables using Hive as of 
now. But it is not fast as we wanted it to be, so we are thinking of using 
spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (MMdd) as integer

Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Manjunath Shetty H
Only partitioned and Join keys are not sorted coz those are written 
incrementally with batch jobs

From: Georg Heiler 
Sent: Sunday, March 15, 2020 8:30:53 PM
To: Manjunath Shetty H 
Cc: ayan guha ; Magnus Nilsson ; user 

Subject: Re: Optimising multiple hive table join and query in spark

Did you only partition or also bucket by the join column? Are ORCI indices 
active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned 
by same column. During join it does reshuffle, that is the bottleneck as of now 
in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at 
the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha mailto:guha.a...@gmail.com>>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson mailto:ma...@kth.se>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during 
the query. One possibility is it just takes ramp up time for executors to be 
available, if thats the case then maybe a dedicated yarn queue may help, or 
using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson 
mailto:ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a 
join condition to trick catalyst into not reshuffling the partitions, ie use 
regular equality on the column you partitioned or bucketed by and your custom 
comparer for the other columns. Never got around to try it out hough. I really 
would like a native way to tell catalyst not to reshuffle just because you use 
more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
are serving a usecase on top of that by joining 4-5 tables using Hive as of 
now. But it is not fast as we wanted it to be, so we are thinking of using 
spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (MMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with 
other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all 
tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already 
partitioned using country it still does hashParittioning + shuffle during join. 
All the table join contain `Country` column with some extra column based on the 
table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


--
Best Regards,
Ayan Guha


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Manjunath Shetty H
Mostly the concern is the reshuffle. Even though all the DF's are partitioned 
by same column. During join it does reshuffle, that is the bottleneck as of now 
in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at 
the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha 
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson 
Cc: user 
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during 
the query. One possibility is it just takes ramp up time for executors to be 
available, if thats the case then maybe a dedicated yarn queue may help, or 
using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson 
mailto:ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a 
join condition to trick catalyst into not reshuffling the partitions, ie use 
regular equality on the column you partitioned or bucketed by and your custom 
comparer for the other columns. Never got around to try it out hough. I really 
would like a native way to tell catalyst not to reshuffle just because you use 
more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
are serving a usecase on top of that by joining 4-5 tables using Hive as of 
now. But it is not fast as we wanted it to be, so we are thinking of using 
spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (MMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with 
other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all 
tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already 
partitioned using country it still does hashParittioning + shuffle during join. 
All the table join contain `Country` column with some extra column based on the 
table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


--
Best Regards,
Ayan Guha


Optimising multiple hive table join and query in spark

2020-03-14 Thread Manjunath Shetty H
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
are serving a usecase on top of that by joining 4-5 tables using Hive as of 
now. But it is not fast as we wanted it to be, so we are thinking of using 
spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (MMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with 
other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all 
tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already 
partitioned using country it still does hashParittioning + shuffle during join. 
All the table join contain `Country` column with some extra column based on the 
table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


Re: Way to get the file name of the output when doing ORC write from dataframe

2020-03-04 Thread Manjunath Shetty H
Or is there any way to provide a Unique file name to the ORC write function 
itself ?

Any suggestions will be helpful.

Regards
Manjunath Shetty

From: Manjunath Shetty H 
Sent: Wednesday, March 4, 2020 2:28 PM
To: user 
Subject: Way to get the file name of the output when doing ORC write from 
dataframe

Hi,

I wanted to know if there is any way to get the output file name that 
`Dataframe.orc()` will write to ?. This is needed to track which file is 
written by which job during incremental batch jobs.

Thanks
Manjunath


Re: How to collect Spark dataframe write metrics

2020-03-04 Thread Manjunath Shetty H
Thanks Zohar,

Will try that


-
Manjunath

From: Zohar Stiro 
Sent: Tuesday, March 3, 2020 1:49 PM
To: Manjunath Shetty H 
Cc: user 
Subject: Re: How to collect Spark dataframe write metrics

Hi,

to get DataFrame level write metrics you can take a look at the following trait 
:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala
and a basic implementation example:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala

and here is an example of how it is being used in FileStreamSink:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L178

- about the good practise - it depends on your use case but Generally speaking 
I would not do it - at least not for checking your logic/ checking spark is 
working correctly.

‫בתאריך יום א׳, 1 במרץ 2020 ב-14:32 מאת ‪Manjunath Shetty H‬‏ 
<‪manjunathshe...@live.com<mailto:manjunathshe...@live.com>‬‏>:‬
Hi all,

Basically my use case is to validate the DataFrame rows count before and after 
writing to HDFS. Is this even to good practice ? Or Should relay on spark for 
guaranteed writes ?.

If it is a good practice to follow then how to get the DataFrame level write 
metrics ?

Any pointers would be helpful.


Thanks and Regards
Manjunath


Way to get the file name of the output when doing ORC write from dataframe

2020-03-04 Thread Manjunath Shetty H
Hi,

I wanted to know if there is any way to get the output file name that 
`Dataframe.orc()` will write to ?. This is needed to track which file is 
written by which job during incremental batch jobs.

Thanks
Manjunath


How to collect Spark dataframe write metrics

2020-03-01 Thread Manjunath Shetty H
Hi all,

Basically my use case is to validate the DataFrame rows count before and after 
writing to HDFS. Is this even to good practice ? Or Should relay on spark for 
guaranteed writes ?.

If it is a good practice to follow then how to get the DataFrame level write 
metrics ?

Any pointers would be helpful.


Thanks and Regards
Manjunath


Re: Convert each partition of RDD to Dataframe

2020-02-28 Thread Manjunath Shetty H
Hi Enrico,

Thanks for the suggestion, i wanted to know if there are any performance 
implications of running multi-threaded driver ?
If i create multiple Dataframes in parallel, then Spark will schedule those 
jobs in parallel ?

Thanks
Manjunath

From: Enrico Minack 
Sent: Thursday, February 27, 2020 8:51 PM
To: Manjunath Shetty H ; user@spark.apache.org 

Subject: Re: Convert each partition of RDD to Dataframe

Manjunath,

You can define your DataFrame in parallel in a multi-threaded driver.

Enrico

Am 27.02.20 um 15:50 schrieb Manjunath Shetty H:
Hi Enrico,

In that case how to make effective use of all nodes in the cluster ?.

And also whats your opinion on the below

  *   Create 10 Dataframes sequentially in Driver program and transform/write 
to hdfs one after the other
  *   Or the current approach mentioned in the previous mail

What will be the performance implications ?

Regards
Manjunath


From: Enrico Minack <mailto:m...@enrico.minack.dev>
Sent: Thursday, February 27, 2020 7:57 PM
To: user@spark.apache.org<mailto:user@spark.apache.org> 
<mailto:user@spark.apache.org>
Subject: Re: Convert each partition of RDD to Dataframe

Hi Manjunath,

why not creating 10 DataFrames loading the different tables in the first place?

Enrico


Am 27.02.20 um 14:53 schrieb Manjunath Shetty H:
Hi Vinodh,

Thanks for the quick response. Didn't got what you meant exactly, any reference 
or snippet  will be helpful.

To explain the problem more,

  *   I have 10 partitions , each partition loads the data from different table 
and different SQL shard.
  *   Most of the partitions will have different schema.
  *   Before persisting the data i want to do some column level manipulation 
using data frame.

So thats why i want to create 10 (based on partitions ) dataframes that maps to 
10 different table/shard from a RDD.

Regards
Manjunath

From: Charles vinodh <mailto:mig.flan...@gmail.com>
Sent: Thursday, February 27, 2020 7:04 PM
To: manjunathshe...@live.com<mailto:manjunathshe...@live.com> 
<mailto:manjunathshe...@live.com>
Cc: user <mailto:user@spark.apache.org>
Subject: Re: Convert each partition of RDD to Dataframe

Just split the single rdd into multiple individual rdds using a filter 
operation and then convert each individual rdds to it's respective dataframe..

On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:

Hello All,


In spark i am creating the custom partitions with Custom RDD, each partition 
will have different schema. Now in the transformation step we need to get the 
schema and run some Dataframe SQL queries per partition, because each partition 
data has different schema.

How to get the Dataframe's per partition of a RDD?.

As of now i am doing foreachPartition on RDD and converting Iterable to 
List and converting that to Dataframe. But the problem is converting Iterable 
to List will bring all the data to memory and it might crash the process.

Is there any known way to do this ? or is there any way to handle Custom 
Partitions in Dataframes instead of using RDD ?

I am using Spark version 1.6.2.

Any pointers would be helpful. Thanks in advance





Re: Convert each partition of RDD to Dataframe

2020-02-27 Thread Manjunath Shetty H
Hi Enrico,

In that case how to make effective use of all nodes in the cluster ?.

And also whats your opinion on the below

  *   Create 10 Dataframes sequentially in Driver program and transform/write 
to hdfs one after the other
  *   Or the current approach mentioned in the previous mail

What will be the performance implications ?

Regards
Manjunath


From: Enrico Minack 
Sent: Thursday, February 27, 2020 7:57 PM
To: user@spark.apache.org 
Subject: Re: Convert each partition of RDD to Dataframe

Hi Manjunath,

why not creating 10 DataFrames loading the different tables in the first place?

Enrico


Am 27.02.20 um 14:53 schrieb Manjunath Shetty H:
Hi Vinodh,

Thanks for the quick response. Didn't got what you meant exactly, any reference 
or snippet  will be helpful.

To explain the problem more,

  *   I have 10 partitions , each partition loads the data from different table 
and different SQL shard.
  *   Most of the partitions will have different schema.
  *   Before persisting the data i want to do some column level manipulation 
using data frame.

So thats why i want to create 10 (based on partitions ) dataframes that maps to 
10 different table/shard from a RDD.

Regards
Manjunath

From: Charles vinodh <mailto:mig.flan...@gmail.com>
Sent: Thursday, February 27, 2020 7:04 PM
To: manjunathshe...@live.com<mailto:manjunathshe...@live.com> 
<mailto:manjunathshe...@live.com>
Cc: user <mailto:user@spark.apache.org>
Subject: Re: Convert each partition of RDD to Dataframe

Just split the single rdd into multiple individual rdds using a filter 
operation and then convert each individual rdds to it's respective dataframe..

On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:

Hello All,


In spark i am creating the custom partitions with Custom RDD, each partition 
will have different schema. Now in the transformation step we need to get the 
schema and run some Dataframe SQL queries per partition, because each partition 
data has different schema.

How to get the Dataframe's per partition of a RDD?.

As of now i am doing foreachPartition on RDD and converting Iterable to 
List and converting that to Dataframe. But the problem is converting Iterable 
to List will bring all the data to memory and it might crash the process.

Is there any known way to do this ? or is there any way to handle Custom 
Partitions in Dataframes instead of using RDD ?

I am using Spark version 1.6.2.

Any pointers would be helpful. Thanks in advance




Re: Convert each partition of RDD to Dataframe

2020-02-27 Thread Manjunath Shetty H
Hi Vinodh,

Thanks for the quick response. Didn't got what you meant exactly, any reference 
or snippet  will be helpful.

To explain the problem more,

  *   I have 10 partitions , each partition loads the data from different table 
and different SQL shard.
  *   Most of the partitions will have different schema.
  *   Before persisting the data i want to do some column level manipulation 
using data frame.

So thats why i want to create 10 (based on partitions ) dataframes that maps to 
10 different table/shard from a RDD.

Regards
Manjunath

From: Charles vinodh 
Sent: Thursday, February 27, 2020 7:04 PM
To: manjunathshe...@live.com 
Cc: user 
Subject: Re: Convert each partition of RDD to Dataframe

Just split the single rdd into multiple individual rdds using a filter 
operation and then convert each individual rdds to it's respective dataframe..

On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:

Hello All,


In spark i am creating the custom partitions with Custom RDD, each partition 
will have different schema. Now in the transformation step we need to get the 
schema and run some Dataframe SQL queries per partition, because each partition 
data has different schema.

How to get the Dataframe's per partition of a RDD?.

As of now i am doing foreachPartition on RDD and converting Iterable to 
List and converting that to Dataframe. But the problem is converting Iterable 
to List will bring all the data to memory and it might crash the process.

Is there any known way to do this ? or is there any way to handle Custom 
Partitions in Dataframes instead of using RDD ?

I am using Spark version 1.6.2.

Any pointers would be helpful. Thanks in advance



Convert each partition of RDD to Dataframe

2020-02-27 Thread Manjunath Shetty H

Hello All,


In spark i am creating the custom partitions with Custom RDD, each partition 
will have different schema. Now in the transformation step we need to get the 
schema and run some Dataframe SQL queries per partition, because each partition 
data has different schema.

How to get the Dataframe's per partition of a RDD?.

As of now i am doing foreachPartition on RDD and converting Iterable to 
List and converting that to Dataframe. But the problem is converting Iterable 
to List will bring all the data to memory and it might crash the process.

Is there any known way to do this ? or is there any way to handle Custom 
Partitions in Dataframes instead of using RDD ?

I am using Spark version 1.6.2.

Any pointers would be helpful. Thanks in advance