Spark stage stuck
Hi, I have running multiple jobs in same driver with FAIR scheduling enabled. Intermittently one of the Stage gets stuck and not completing even after long time. Each job flow is something like this * Create JDBC RDD to load data from SQL Server * Create temporary table * Query Temp table with specific set of Columns * Persist the DF * Write DF to HDFS in ORC format * ... As writing the ORC is the first action it shows it stuck at writing ORC. Is there any way to debug this problem ? Any pointers will be helpful Thanks Manjunath
Re: Parallelising JDBC reads in spark
Thanks Dhaval for the suggestion, but in the case i mentioned in previous mail still data can be missed as the row number will change. - Manjunath From: Dhaval Patel Sent: Monday, May 25, 2020 3:01 PM To: Manjunath Shetty H Subject: Re: Parallelising JDBC reads in spark If possible, set the watermark before reading data. Read the max of watermark column before reading actual data and add that in query to read actual data, like watermark <= current_watermark It may query db twice, however it will make sure you are not missing any records Regards Dhaval On Mon, May 25, 2020 at 3:38 AM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Thanks Georg for the suggestion, but at this point changing the design is not really the option. Any other pointer would be helpful. Thanks Manjunath From: Georg Heiler mailto:georg.kf.hei...@gmail.com>> Sent: Monday, May 25, 2020 11:52 AM To: Manjunath Shetty H mailto:manjunathshe...@live.com>> Cc: Mike Artz mailto:michaelea...@gmail.com>>; user mailto:user@spark.apache.org>> Subject: Re: Parallelising JDBC reads in spark Well you seem to have performance and consistency problems. Using a CDC tool fitting for your database you might be able to fix both. However, streaming the change events of the database log might be a bit more complicated. Tools like https://debezium.io/ could be useful - depending on your source database. Best, Georg Am Mo., 25. Mai 2020 um 08:16 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Hi Georg, Thanks for the response, can please elaborate what do mean by change data capture ? Thanks Manjunath From: Georg Heiler mailto:georg.kf.hei...@gmail.com>> Sent: Monday, May 25, 2020 11:14 AM To: Manjunath Shetty H mailto:manjunathshe...@live.com>> Cc: Mike Artz mailto:michaelea...@gmail.com>>; user mailto:user@spark.apache.org>> Subject: Re: Parallelising JDBC reads in spark Why don't you apply proper change data capture? This will be more complex though. Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Hi Mike, Thanks for the response. Even with that flag set data miss can happen right ?. As the fetch is based on the last watermark (maximum timestamp of the row that last batch job fetched ), Take a scenario like this with table a : 1 b : 2 c : 3 d : 4 f : 6 g : 7 h : 8 e : 5 * a,b,c,d,e get picked by 1 task * by the time second task starts, e has been updated, so the row order changes * As f moves up, it will completely get missed in the fetch Thanks Manjunath From: Mike Artz mailto:michaelea...@gmail.com>> Sent: Monday, May 25, 2020 10:50 AM To: Manjunath Shetty H mailto:manjunathshe...@live.com>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Parallelising JDBC reads in spark Does anything different happened when you set the isolationLevel to do Dirty Reads i.e. "READ_UNCOMMITTED" On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hi, We are writing a ETL pipeline using Spark, that fetch the data from SQL server in batch mode (every 15mins). Problem we are facing when we try to parallelising single table reads into multiple tasks without missing any data. We have tried this, * Use `ROW_NUMBER` window function in the SQL query * Then do * DataFrame df = hiveContext .read() .jdbc( , query, "row_num", 1, , noOfPartitions, jdbcOptions); The problem with this approach is if our tables get updated in between in SQL Server while tasks are still running then the `ROW_NUMBER` will change and we may miss some records. Any approach to how to fix this issue ? . Any pointers will be helpful Note: I am on spark 1.6 Thanks Manjiunath Shetty
Re: Parallelising JDBC reads in spark
Thanks Georg for the suggestion, but at this point changing the design is not really the option. Any other pointer would be helpful. Thanks Manjunath From: Georg Heiler Sent: Monday, May 25, 2020 11:52 AM To: Manjunath Shetty H Cc: Mike Artz ; user Subject: Re: Parallelising JDBC reads in spark Well you seem to have performance and consistency problems. Using a CDC tool fitting for your database you might be able to fix both. However, streaming the change events of the database log might be a bit more complicated. Tools like https://debezium.io/ could be useful - depending on your source database. Best, Georg Am Mo., 25. Mai 2020 um 08:16 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Hi Georg, Thanks for the response, can please elaborate what do mean by change data capture ? Thanks Manjunath From: Georg Heiler mailto:georg.kf.hei...@gmail.com>> Sent: Monday, May 25, 2020 11:14 AM To: Manjunath Shetty H mailto:manjunathshe...@live.com>> Cc: Mike Artz mailto:michaelea...@gmail.com>>; user mailto:user@spark.apache.org>> Subject: Re: Parallelising JDBC reads in spark Why don't you apply proper change data capture? This will be more complex though. Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Hi Mike, Thanks for the response. Even with that flag set data miss can happen right ?. As the fetch is based on the last watermark (maximum timestamp of the row that last batch job fetched ), Take a scenario like this with table a : 1 b : 2 c : 3 d : 4 f : 6 g : 7 h : 8 e : 5 * a,b,c,d,e get picked by 1 task * by the time second task starts, e has been updated, so the row order changes * As f moves up, it will completely get missed in the fetch Thanks Manjunath From: Mike Artz mailto:michaelea...@gmail.com>> Sent: Monday, May 25, 2020 10:50 AM To: Manjunath Shetty H mailto:manjunathshe...@live.com>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Parallelising JDBC reads in spark Does anything different happened when you set the isolationLevel to do Dirty Reads i.e. "READ_UNCOMMITTED" On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hi, We are writing a ETL pipeline using Spark, that fetch the data from SQL server in batch mode (every 15mins). Problem we are facing when we try to parallelising single table reads into multiple tasks without missing any data. We have tried this, * Use `ROW_NUMBER` window function in the SQL query * Then do * DataFrame df = hiveContext .read() .jdbc( , query, "row_num", 1, , noOfPartitions, jdbcOptions); The problem with this approach is if our tables get updated in between in SQL Server while tasks are still running then the `ROW_NUMBER` will change and we may miss some records. Any approach to how to fix this issue ? . Any pointers will be helpful Note: I am on spark 1.6 Thanks Manjiunath Shetty
Re: Parallelising JDBC reads in spark
Hi Georg, Thanks for the response, can please elaborate what do mean by change data capture ? Thanks Manjunath From: Georg Heiler Sent: Monday, May 25, 2020 11:14 AM To: Manjunath Shetty H Cc: Mike Artz ; user Subject: Re: Parallelising JDBC reads in spark Why don't you apply proper change data capture? This will be more complex though. Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Hi Mike, Thanks for the response. Even with that flag set data miss can happen right ?. As the fetch is based on the last watermark (maximum timestamp of the row that last batch job fetched ), Take a scenario like this with table a : 1 b : 2 c : 3 d : 4 f : 6 g : 7 h : 8 e : 5 * a,b,c,d,e get picked by 1 task * by the time second task starts, e has been updated, so the row order changes * As f moves up, it will completely get missed in the fetch Thanks Manjunath From: Mike Artz mailto:michaelea...@gmail.com>> Sent: Monday, May 25, 2020 10:50 AM To: Manjunath Shetty H mailto:manjunathshe...@live.com>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Parallelising JDBC reads in spark Does anything different happened when you set the isolationLevel to do Dirty Reads i.e. "READ_UNCOMMITTED" On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hi, We are writing a ETL pipeline using Spark, that fetch the data from SQL server in batch mode (every 15mins). Problem we are facing when we try to parallelising single table reads into multiple tasks without missing any data. We have tried this, * Use `ROW_NUMBER` window function in the SQL query * Then do * DataFrame df = hiveContext .read() .jdbc( , query, "row_num", 1, , noOfPartitions, jdbcOptions); The problem with this approach is if our tables get updated in between in SQL Server while tasks are still running then the `ROW_NUMBER` will change and we may miss some records. Any approach to how to fix this issue ? . Any pointers will be helpful Note: I am on spark 1.6 Thanks Manjiunath Shetty
Re: Parallelising JDBC reads in spark
Hi Mike, Thanks for the response. Even with that flag set data miss can happen right ?. As the fetch is based on the last watermark (maximum timestamp of the row that last batch job fetched ), Take a scenario like this with table a : 1 b : 2 c : 3 d : 4 f : 6 g : 7 h : 8 e : 5 * a,b,c,d,e get picked by 1 task * by the time second task starts, e has been updated, so the row order changes * As f moves up, it will completely get missed in the fetch Thanks Manjunath From: Mike Artz Sent: Monday, May 25, 2020 10:50 AM To: Manjunath Shetty H Cc: user Subject: Re: Parallelising JDBC reads in spark Does anything different happened when you set the isolationLevel to do Dirty Reads i.e. "READ_UNCOMMITTED" On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hi, We are writing a ETL pipeline using Spark, that fetch the data from SQL server in batch mode (every 15mins). Problem we are facing when we try to parallelising single table reads into multiple tasks without missing any data. We have tried this, * Use `ROW_NUMBER` window function in the SQL query * Then do * DataFrame df = hiveContext .read() .jdbc( , query, "row_num", 1, , noOfPartitions, jdbcOptions); The problem with this approach is if our tables get updated in between in SQL Server while tasks are still running then the `ROW_NUMBER` will change and we may miss some records. Any approach to how to fix this issue ? . Any pointers will be helpful Note: I am on spark 1.6 Thanks Manjiunath Shetty
Parallelising JDBC reads in spark
Hi, We are writing a ETL pipeline using Spark, that fetch the data from SQL server in batch mode (every 15mins). Problem we are facing when we try to parallelising single table reads into multiple tasks without missing any data. We have tried this, * Use `ROW_NUMBER` window function in the SQL query * Then do * DataFrame df = hiveContext .read() .jdbc( , query, "row_num", 1, , noOfPartitions, jdbcOptions); The problem with this approach is if our tables get updated in between in SQL Server while tasks are still running then the `ROW_NUMBER` will change and we may miss some records. Any approach to how to fix this issue ? . Any pointers will be helpful Note: I am on spark 1.6 Thanks Manjiunath Shetty
How to change Dataframe schema
Hi, I have a dataframe with some columns and data that is fetched from JDBC, as i have to maintain the schema consistent in the ORC file i have to apply different schema for that dataframe. Column names will be same, but Data or Schema may contain some extra columns. Is there any way i can apply the schema on top the existing Dataframe ?. Schema may be just doing the columns reordering in the most of the cases. i have tried this " DataFrame dfNew = hc.createDataFrame(df.rdd(), ((StructType) DataType.fromJson(schema))); " But this will map the columns based on index and it will fail in case of columns reordering. Any pointers will be helpful. Thanks and Regards Manjunath Shetty
Spark ORC store written timestamp as column
Hi All, Is there anyway to store the exact written timestamp in the ORC file through spark ?. Use case something like `current_timestamp()` function in SQL. Generating in the program will not be equal to actual write time in ORC/hdfs file. Any suggestions will be helpful. Thanks Manjunath
Spark 1.6 and ORC bucketed queries
Hi, Is it possible to do ORC bucked queries in Spark 1.6 ? Folder structure is like this: / bucket1.orc bucket2.orc bucket3.orc And the Spark SQL query will be like `select * from where partition = partition1 and bucket = bucket1`, this query should only read `bucket1.orc` file. Is this possible with Spark 1.6, if so please let me know how to achieve that ? Thanks Manjunath Shetty
Spark SQL join ORC and non ORC tables in hive
Hi, i am on spark 1.6. I am getting error if i try to run a hive query in Spark that involves joining ORC and non-ORC tables in hive. Find the error below, any help would be appreciated org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: TungstenExchange hashpartitioning(CENTRAL_ITEM_ID#150,200), None +- ConvertToUnsafe +- HiveTableScan [CENTRAL_ITEM_ID#150,PHM_ITEM_COST_AMT#154,LAST_CHANGE_TS#155,HISTORICAL_LAST_CHANGE_TS#160], MetastoreRelation rxdwh, phm_item_cost, None at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Sort.doExecute(Sort.scala:64) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Window.doExecute(Window.scala:246) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Filter.doExecute(basicOperators.scala:70) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:85) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1$$anonfun$apply$1.apply(BroadcastHashOuterJoin.scala:82) at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:82) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin$$anonfun$broadcastFuture$1.apply(BroadcastHashOuterJoin.scala:82) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: serious problem at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at
Re: Saving Spark run stats and run watermark
Thanks for suggestion Netanel, Sorry for less information, I am specifically looking for something inside Hadoop ecosystem. - Manjunath From: Netanel Malka Sent: Wednesday, March 18, 2020 5:26 PM To: Manjunath Shetty H Subject: Re: Saving Spark run stats and run watermark You can try to use a RDBMS like postgrsql or mysql. I would use a regular table. Spark have an built-in integration for that: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html On Wed, Mar 18, 2020, 13:03 Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hi All, Want to save each spark batch run stats (start, end, ID etc) and watermark ( Last processed timestamp from external data source). We have tried Hive JDBC, but it is very slow due MR jobs it will trigger. Cant save to normal Hive tables as it will create lots of small files in HDFS. Please suggest what is the recommended way to do this ? Any pointers will be helpful Thanks and regards Manjunath
Saving Spark run stats and run watermark
Hi All, Want to save each spark batch run stats (start, end, ID etc) and watermark ( Last processed timestamp from external data source). We have tried Hive JDBC, but it is very slow due MR jobs it will trigger. Cant save to normal Hive tables as it will create lots of small files in HDFS. Please suggest what is the recommended way to do this ? Any pointers will be helpful Thanks and regards Manjunath
Re: Optimising multiple hive table join and query in spark
Thanks Georg, Batch import job frequency is different than the read job. Import job will run every 15mins - 1 hour, and Read/Transform job will run once a day. In this case i think write with sortWithinPartitions doesnt make any difference as the combined data stored in HDFS will not be sorted at the end of the day. Does partition/sort while reading help ?. Tried this out but it still results in shuffle during join of multiple tables and generates very complex DAG - Manjunath From: Georg Heiler Sent: Monday, March 16, 2020 12:06 PM To: Manjunath Shetty H Subject: Re: Optimising multiple hive table join and query in spark Hi, if you only have 1.6, forget bucketing. https://databricks.com/session/bucketing-in-spark-sql-2-3 that only works well with Hive from 2.3 onwards. The other thing in your (daily?) batch job val myData = spark.read.<>(/path/to/file).transform(<>) Now when writing the data: myData.write.repartition(xxx) where xxx resembles the number of files you want to have for each period (day?). When writing ORC / Parquet make sure to have files of HDFS Block Size or more i.e. usually 128MB up to a maximum of 1G. myData.write.repartition(xxx)).sortWithinPartitions(join_col, join_col) apply a secondary sort to get ORC Indices. IF the cardinality of the join_cols is high enough: myData.write.repartition(xxx, col(join_col), col(other_join_col))).sortWithinPartitions(join_col, join_col) Best, Georg Am Mo., 16. März 2020 um 04:27 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Hi Georg, Thanks for the suggestion. Can you please explain bit more about what you meant exactly ? Bdw i am on Spark 1.6 - Manjunath From: Georg Heiler mailto:georg.kf.hei...@gmail.com>> Sent: Monday, March 16, 2020 12:35 AM To: Manjunath Shetty H mailto:manjunathshe...@live.com>> Subject: Re: Optimising multiple hive table join and query in spark To speed things up: - sortWithinPartitions (i.e. for each day)& potentially repartition - pre-shuffle the data with bucketing Am So., 15. März 2020 um 17:07 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Only partitioned and Join keys are not sorted coz those are written incrementally with batch jobs From: Georg Heiler mailto:georg.kf.hei...@gmail.com>> Sent: Sunday, March 15, 2020 8:30:53 PM To: Manjunath Shetty H mailto:manjunathshe...@live.com>> Cc: ayan guha mailto:guha.a...@gmail.com>>; Magnus Nilsson mailto:ma...@kth.se>>; user mailto:user@spark.apache.org>> Subject: Re: Optimising multiple hive table join and query in spark Did you only partition or also bucket by the join column? Are ORCI indices active i.e. the JOIN keys sorted when writing the files? Best, Georg Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation. Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again. - Manjunath From: ayan guha mailto:guha.a...@gmail.com>> Sent: Sunday, March 15, 2020 5:46 PM To: Magnus Nilsson mailto:ma...@kth.se>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Optimising multiple hive table join and query in spark Hi I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help. On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson mailto:ma...@kth.se>> wrote: Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hi All, We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case. Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ? Any pointers would be helpful. Notes: * Data is partitioned by date (MMdd) as integer
Re: Optimising multiple hive table join and query in spark
Only partitioned and Join keys are not sorted coz those are written incrementally with batch jobs From: Georg Heiler Sent: Sunday, March 15, 2020 8:30:53 PM To: Manjunath Shetty H Cc: ayan guha ; Magnus Nilsson ; user Subject: Re: Optimising multiple hive table join and query in spark Did you only partition or also bucket by the join column? Are ORCI indices active i.e. the JOIN keys sorted when writing the files? Best, Georg Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H mailto:manjunathshe...@live.com>>: Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation. Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again. - Manjunath From: ayan guha mailto:guha.a...@gmail.com>> Sent: Sunday, March 15, 2020 5:46 PM To: Magnus Nilsson mailto:ma...@kth.se>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Optimising multiple hive table join and query in spark Hi I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help. On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson mailto:ma...@kth.se>> wrote: Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hi All, We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case. Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ? Any pointers would be helpful. Notes: * Data is partitioned by date (MMdd) as integer. * Query will fetch data for last 7 days from some tables while joining with other tables. Approach we thought of as now : * Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column ) * Register all tables as temporary tables * Run the sql query with joins But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table. Is there any way to avoid these shuffles ? and improve performance ? Thanks and regards Manjunath -- Best Regards, Ayan Guha
Re: Optimising multiple hive table join and query in spark
Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation. Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again. - Manjunath From: ayan guha Sent: Sunday, March 15, 2020 5:46 PM To: Magnus Nilsson Cc: user Subject: Re: Optimising multiple hive table join and query in spark Hi I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help. On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson mailto:ma...@kth.se>> wrote: Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hi All, We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case. Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ? Any pointers would be helpful. Notes: * Data is partitioned by date (MMdd) as integer. * Query will fetch data for last 7 days from some tables while joining with other tables. Approach we thought of as now : * Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column ) * Register all tables as temporary tables * Run the sql query with joins But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table. Is there any way to avoid these shuffles ? and improve performance ? Thanks and regards Manjunath -- Best Regards, Ayan Guha
Optimising multiple hive table join and query in spark
Hi All, We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case. Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ? Any pointers would be helpful. Notes: * Data is partitioned by date (MMdd) as integer. * Query will fetch data for last 7 days from some tables while joining with other tables. Approach we thought of as now : * Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column ) * Register all tables as temporary tables * Run the sql query with joins But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table. Is there any way to avoid these shuffles ? and improve performance ? Thanks and regards Manjunath
Re: Way to get the file name of the output when doing ORC write from dataframe
Or is there any way to provide a Unique file name to the ORC write function itself ? Any suggestions will be helpful. Regards Manjunath Shetty From: Manjunath Shetty H Sent: Wednesday, March 4, 2020 2:28 PM To: user Subject: Way to get the file name of the output when doing ORC write from dataframe Hi, I wanted to know if there is any way to get the output file name that `Dataframe.orc()` will write to ?. This is needed to track which file is written by which job during incremental batch jobs. Thanks Manjunath
Re: How to collect Spark dataframe write metrics
Thanks Zohar, Will try that - Manjunath From: Zohar Stiro Sent: Tuesday, March 3, 2020 1:49 PM To: Manjunath Shetty H Cc: user Subject: Re: How to collect Spark dataframe write metrics Hi, to get DataFrame level write metrics you can take a look at the following trait : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala and a basic implementation example: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala and here is an example of how it is being used in FileStreamSink: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L178 - about the good practise - it depends on your use case but Generally speaking I would not do it - at least not for checking your logic/ checking spark is working correctly. בתאריך יום א׳, 1 במרץ 2020 ב-14:32 מאת Manjunath Shetty H <manjunathshe...@live.com<mailto:manjunathshe...@live.com>>: Hi all, Basically my use case is to validate the DataFrame rows count before and after writing to HDFS. Is this even to good practice ? Or Should relay on spark for guaranteed writes ?. If it is a good practice to follow then how to get the DataFrame level write metrics ? Any pointers would be helpful. Thanks and Regards Manjunath
Way to get the file name of the output when doing ORC write from dataframe
Hi, I wanted to know if there is any way to get the output file name that `Dataframe.orc()` will write to ?. This is needed to track which file is written by which job during incremental batch jobs. Thanks Manjunath
How to collect Spark dataframe write metrics
Hi all, Basically my use case is to validate the DataFrame rows count before and after writing to HDFS. Is this even to good practice ? Or Should relay on spark for guaranteed writes ?. If it is a good practice to follow then how to get the DataFrame level write metrics ? Any pointers would be helpful. Thanks and Regards Manjunath
Re: Convert each partition of RDD to Dataframe
Hi Enrico, Thanks for the suggestion, i wanted to know if there are any performance implications of running multi-threaded driver ? If i create multiple Dataframes in parallel, then Spark will schedule those jobs in parallel ? Thanks Manjunath From: Enrico Minack Sent: Thursday, February 27, 2020 8:51 PM To: Manjunath Shetty H ; user@spark.apache.org Subject: Re: Convert each partition of RDD to Dataframe Manjunath, You can define your DataFrame in parallel in a multi-threaded driver. Enrico Am 27.02.20 um 15:50 schrieb Manjunath Shetty H: Hi Enrico, In that case how to make effective use of all nodes in the cluster ?. And also whats your opinion on the below * Create 10 Dataframes sequentially in Driver program and transform/write to hdfs one after the other * Or the current approach mentioned in the previous mail What will be the performance implications ? Regards Manjunath From: Enrico Minack <mailto:m...@enrico.minack.dev> Sent: Thursday, February 27, 2020 7:57 PM To: user@spark.apache.org<mailto:user@spark.apache.org> <mailto:user@spark.apache.org> Subject: Re: Convert each partition of RDD to Dataframe Hi Manjunath, why not creating 10 DataFrames loading the different tables in the first place? Enrico Am 27.02.20 um 14:53 schrieb Manjunath Shetty H: Hi Vinodh, Thanks for the quick response. Didn't got what you meant exactly, any reference or snippet will be helpful. To explain the problem more, * I have 10 partitions , each partition loads the data from different table and different SQL shard. * Most of the partitions will have different schema. * Before persisting the data i want to do some column level manipulation using data frame. So thats why i want to create 10 (based on partitions ) dataframes that maps to 10 different table/shard from a RDD. Regards Manjunath From: Charles vinodh <mailto:mig.flan...@gmail.com> Sent: Thursday, February 27, 2020 7:04 PM To: manjunathshe...@live.com<mailto:manjunathshe...@live.com> <mailto:manjunathshe...@live.com> Cc: user <mailto:user@spark.apache.org> Subject: Re: Convert each partition of RDD to Dataframe Just split the single rdd into multiple individual rdds using a filter operation and then convert each individual rdds to it's respective dataframe.. On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hello All, In spark i am creating the custom partitions with Custom RDD, each partition will have different schema. Now in the transformation step we need to get the schema and run some Dataframe SQL queries per partition, because each partition data has different schema. How to get the Dataframe's per partition of a RDD?. As of now i am doing foreachPartition on RDD and converting Iterable to List and converting that to Dataframe. But the problem is converting Iterable to List will bring all the data to memory and it might crash the process. Is there any known way to do this ? or is there any way to handle Custom Partitions in Dataframes instead of using RDD ? I am using Spark version 1.6.2. Any pointers would be helpful. Thanks in advance
Re: Convert each partition of RDD to Dataframe
Hi Enrico, In that case how to make effective use of all nodes in the cluster ?. And also whats your opinion on the below * Create 10 Dataframes sequentially in Driver program and transform/write to hdfs one after the other * Or the current approach mentioned in the previous mail What will be the performance implications ? Regards Manjunath From: Enrico Minack Sent: Thursday, February 27, 2020 7:57 PM To: user@spark.apache.org Subject: Re: Convert each partition of RDD to Dataframe Hi Manjunath, why not creating 10 DataFrames loading the different tables in the first place? Enrico Am 27.02.20 um 14:53 schrieb Manjunath Shetty H: Hi Vinodh, Thanks for the quick response. Didn't got what you meant exactly, any reference or snippet will be helpful. To explain the problem more, * I have 10 partitions , each partition loads the data from different table and different SQL shard. * Most of the partitions will have different schema. * Before persisting the data i want to do some column level manipulation using data frame. So thats why i want to create 10 (based on partitions ) dataframes that maps to 10 different table/shard from a RDD. Regards Manjunath From: Charles vinodh <mailto:mig.flan...@gmail.com> Sent: Thursday, February 27, 2020 7:04 PM To: manjunathshe...@live.com<mailto:manjunathshe...@live.com> <mailto:manjunathshe...@live.com> Cc: user <mailto:user@spark.apache.org> Subject: Re: Convert each partition of RDD to Dataframe Just split the single rdd into multiple individual rdds using a filter operation and then convert each individual rdds to it's respective dataframe.. On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hello All, In spark i am creating the custom partitions with Custom RDD, each partition will have different schema. Now in the transformation step we need to get the schema and run some Dataframe SQL queries per partition, because each partition data has different schema. How to get the Dataframe's per partition of a RDD?. As of now i am doing foreachPartition on RDD and converting Iterable to List and converting that to Dataframe. But the problem is converting Iterable to List will bring all the data to memory and it might crash the process. Is there any known way to do this ? or is there any way to handle Custom Partitions in Dataframes instead of using RDD ? I am using Spark version 1.6.2. Any pointers would be helpful. Thanks in advance
Re: Convert each partition of RDD to Dataframe
Hi Vinodh, Thanks for the quick response. Didn't got what you meant exactly, any reference or snippet will be helpful. To explain the problem more, * I have 10 partitions , each partition loads the data from different table and different SQL shard. * Most of the partitions will have different schema. * Before persisting the data i want to do some column level manipulation using data frame. So thats why i want to create 10 (based on partitions ) dataframes that maps to 10 different table/shard from a RDD. Regards Manjunath From: Charles vinodh Sent: Thursday, February 27, 2020 7:04 PM To: manjunathshe...@live.com Cc: user Subject: Re: Convert each partition of RDD to Dataframe Just split the single rdd into multiple individual rdds using a filter operation and then convert each individual rdds to it's respective dataframe.. On Thu, Feb 27, 2020, 7:29 AM Manjunath Shetty H mailto:manjunathshe...@live.com>> wrote: Hello All, In spark i am creating the custom partitions with Custom RDD, each partition will have different schema. Now in the transformation step we need to get the schema and run some Dataframe SQL queries per partition, because each partition data has different schema. How to get the Dataframe's per partition of a RDD?. As of now i am doing foreachPartition on RDD and converting Iterable to List and converting that to Dataframe. But the problem is converting Iterable to List will bring all the data to memory and it might crash the process. Is there any known way to do this ? or is there any way to handle Custom Partitions in Dataframes instead of using RDD ? I am using Spark version 1.6.2. Any pointers would be helpful. Thanks in advance
Convert each partition of RDD to Dataframe
Hello All, In spark i am creating the custom partitions with Custom RDD, each partition will have different schema. Now in the transformation step we need to get the schema and run some Dataframe SQL queries per partition, because each partition data has different schema. How to get the Dataframe's per partition of a RDD?. As of now i am doing foreachPartition on RDD and converting Iterable to List and converting that to Dataframe. But the problem is converting Iterable to List will bring all the data to memory and it might crash the process. Is there any known way to do this ? or is there any way to handle Custom Partitions in Dataframes instead of using RDD ? I am using Spark version 1.6.2. Any pointers would be helpful. Thanks in advance