[jira] [Commented] (SPARK-40233) Unable to load large pandas dataframe to pyspark
[ https://issues.apache.org/jira/browse/SPARK-40233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598566#comment-17598566 ] Niranda Perera commented on SPARK-40233: Well, the driver actually hangs after throwing that warning. Until you mentioned it here, I wasn't aware that it was a python serialization problem. My initial thought was, with arrow enabled, spark should not serialize anything, but rather copies the arrow buffers from the driver to the executors. > Unable to load large pandas dataframe to pyspark > > > Key: SPARK-40233 > URL: https://issues.apache.org/jira/browse/SPARK-40233 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.3.0 >Reporter: Niranda Perera >Priority: Major > > I've been trying to join two large pandas dataframes using pyspark using the > following code. I'm trying to vary executor cores allocated for the > application and measure scalability of pyspark (strong scaling). > {code:java} > r = 10 # 1Bn rows > it = 10 > w = 256 > unique = 0.9 > TOTAL_MEM = 240 > TOTAL_NODES = 14 > max_val = r * unique > rng = default_rng() > frame_data = rng.integers(0, max_val, size=(r, 2)) > frame_data1 = rng.integers(0, max_val, size=(r, 2)) > print(f"data generated", flush=True) > df_l = pd.DataFrame(frame_data).add_prefix("col") > df_r = pd.DataFrame(frame_data1).add_prefix("col") > print(f"data loaded", flush=True) > procs = int(math.ceil(w / TOTAL_NODES)) > mem = int(TOTAL_MEM*0.9) > print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", > flush=True) > spark = SparkSession\ > .builder\ > .appName(f'join {r} {w}')\ > .master('spark://node:7077')\ > .config('spark.executor.memory', f'{int(mem*0.6)}g')\ > .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\ > .config('spark.cores.max', w)\ > .config('spark.driver.memory', '100g')\ > .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\ > .getOrCreate() > sdf0 = spark.createDataFrame(df_l).repartition(w).cache() > sdf1 = spark.createDataFrame(df_r).repartition(w).cache() > print(f"data loaded to spark", flush=True) > try: > for i in range(it): > t1 = time.time() > out = sdf0.join(sdf1, on='col0', how='inner') > count = out.count() > t2 = time.time() > print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", > flush=True) > > del out > del count > gc.collect() > finally: > spark.stop() {code} > {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with > 48 cores and 240GB RAM each. I've spawned master and the driver code in > node1, while other 14 nodes have spawned workers allocating maximum memory. > In the spark context, I am reserving 90% of total memory to executor, > splitting 60% to jvm and 40% to pyspark. > {*}Issue{*}: When I run the above program, I can see that the executors are > being assigned to the app. But it doesn't move forward, even after 60 mins. > For smaller row count (10M), this was working without a problem. Driver output > {code:java} > world sz 256 procs per worker 19 mem 216 iter 8 > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425: > UserWarning: createDataFrame attempted Arrow optimization because > 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed > by the reason below: > Negative initial size: -589934400 > Attempting non-optimization as > 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. > warn(msg) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40233) Unable to load large pandas dataframe to pyspark
[ https://issues.apache.org/jira/browse/SPARK-40233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598546#comment-17598546 ] Niranda Perera commented on SPARK-40233: [~srowen] shouldn't spark driver program terminate/ throw an error if it is a serialization issue from pandas? Either way, what would be a better/ recommended way to load a large dataset to spark? > Unable to load large pandas dataframe to pyspark > > > Key: SPARK-40233 > URL: https://issues.apache.org/jira/browse/SPARK-40233 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.3.0 >Reporter: Niranda Perera >Priority: Major > > I've been trying to join two large pandas dataframes using pyspark using the > following code. I'm trying to vary executor cores allocated for the > application and measure scalability of pyspark (strong scaling). > {code:java} > r = 10 # 1Bn rows > it = 10 > w = 256 > unique = 0.9 > TOTAL_MEM = 240 > TOTAL_NODES = 14 > max_val = r * unique > rng = default_rng() > frame_data = rng.integers(0, max_val, size=(r, 2)) > frame_data1 = rng.integers(0, max_val, size=(r, 2)) > print(f"data generated", flush=True) > df_l = pd.DataFrame(frame_data).add_prefix("col") > df_r = pd.DataFrame(frame_data1).add_prefix("col") > print(f"data loaded", flush=True) > procs = int(math.ceil(w / TOTAL_NODES)) > mem = int(TOTAL_MEM*0.9) > print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", > flush=True) > spark = SparkSession\ > .builder\ > .appName(f'join {r} {w}')\ > .master('spark://node:7077')\ > .config('spark.executor.memory', f'{int(mem*0.6)}g')\ > .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\ > .config('spark.cores.max', w)\ > .config('spark.driver.memory', '100g')\ > .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\ > .getOrCreate() > sdf0 = spark.createDataFrame(df_l).repartition(w).cache() > sdf1 = spark.createDataFrame(df_r).repartition(w).cache() > print(f"data loaded to spark", flush=True) > try: > for i in range(it): > t1 = time.time() > out = sdf0.join(sdf1, on='col0', how='inner') > count = out.count() > t2 = time.time() > print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", > flush=True) > > del out > del count > gc.collect() > finally: > spark.stop() {code} > {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with > 48 cores and 240GB RAM each. I've spawned master and the driver code in > node1, while other 14 nodes have spawned workers allocating maximum memory. > In the spark context, I am reserving 90% of total memory to executor, > splitting 60% to jvm and 40% to pyspark. > {*}Issue{*}: When I run the above program, I can see that the executors are > being assigned to the app. But it doesn't move forward, even after 60 mins. > For smaller row count (10M), this was working without a problem. Driver output > {code:java} > world sz 256 procs per worker 19 mem 216 iter 8 > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425: > UserWarning: createDataFrame attempted Arrow optimization because > 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed > by the reason below: > Negative initial size: -589934400 > Attempting non-optimization as > 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. > warn(msg) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40233) Unable to load large pandas dataframe to pyspark
[ https://issues.apache.org/jira/browse/SPARK-40233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598021#comment-17598021 ] Niranda Perera commented on SPARK-40233: I believe the issue is related to executors not being able to load data from the python driver (possibly not having enough memory). I believe first one executor would have to load the entire data dump from the python driver and then repartition it. My suggestion is to add a `num_partitions` option for createDataFrame method so that partitioning can be handled at the driver and sent to the executors as a list of RDDs. Is this an acceptable way from the POV of spark internals? > Unable to load large pandas dataframe to pyspark > > > Key: SPARK-40233 > URL: https://issues.apache.org/jira/browse/SPARK-40233 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.3.0 >Reporter: Niranda Perera >Priority: Major > > I've been trying to join two large pandas dataframes using pyspark using the > following code. I'm trying to vary executor cores allocated for the > application and measure scalability of pyspark (strong scaling). > {code:java} > r = 10 # 1Bn rows > it = 10 > w = 256 > unique = 0.9 > TOTAL_MEM = 240 > TOTAL_NODES = 14 > max_val = r * unique > rng = default_rng() > frame_data = rng.integers(0, max_val, size=(r, 2)) > frame_data1 = rng.integers(0, max_val, size=(r, 2)) > print(f"data generated", flush=True) > df_l = pd.DataFrame(frame_data).add_prefix("col") > df_r = pd.DataFrame(frame_data1).add_prefix("col") > print(f"data loaded", flush=True) > procs = int(math.ceil(w / TOTAL_NODES)) > mem = int(TOTAL_MEM*0.9) > print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", > flush=True) > spark = SparkSession\ > .builder\ > .appName(f'join {r} {w}')\ > .master('spark://node:7077')\ > .config('spark.executor.memory', f'{int(mem*0.6)}g')\ > .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\ > .config('spark.cores.max', w)\ > .config('spark.driver.memory', '100g')\ > .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\ > .getOrCreate() > sdf0 = spark.createDataFrame(df_l).repartition(w).cache() > sdf1 = spark.createDataFrame(df_r).repartition(w).cache() > print(f"data loaded to spark", flush=True) > try: > for i in range(it): > t1 = time.time() > out = sdf0.join(sdf1, on='col0', how='inner') > count = out.count() > t2 = time.time() > print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", > flush=True) > > del out > del count > gc.collect() > finally: > spark.stop() {code} > {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with > 48 cores and 240GB RAM each. I've spawned master and the driver code in > node1, while other 14 nodes have spawned workers allocating maximum memory. > In the spark context, I am reserving 90% of total memory to executor, > splitting 60% to jvm and 40% to pyspark. > {*}Issue{*}: When I run the above program, I can see that the executors are > being assigned to the app. But it doesn't move forward, even after 60 mins. > For smaller row count (10M), this was working without a problem. Driver output > {code:java} > world sz 256 procs per worker 19 mem 216 iter 8 > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425: > UserWarning: createDataFrame attempted Arrow optimization because > 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed > by the reason below: > Negative initial size: -589934400 > Attempting non-optimization as > 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. > warn(msg) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40233) Unable to load large pandas dataframe to pyspark
Niranda Perera created SPARK-40233: -- Summary: Unable to load large pandas dataframe to pyspark Key: SPARK-40233 URL: https://issues.apache.org/jira/browse/SPARK-40233 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.3.0 Reporter: Niranda Perera I've been trying to join two large pandas dataframes using pyspark using the following code. I'm trying to vary executor cores allocated for the application and measure scalability of pyspark (strong scaling). {code:java} r = 10 # 1Bn rows it = 10 w = 256 unique = 0.9 TOTAL_MEM = 240 TOTAL_NODES = 14 max_val = r * unique rng = default_rng() frame_data = rng.integers(0, max_val, size=(r, 2)) frame_data1 = rng.integers(0, max_val, size=(r, 2)) print(f"data generated", flush=True) df_l = pd.DataFrame(frame_data).add_prefix("col") df_r = pd.DataFrame(frame_data1).add_prefix("col") print(f"data loaded", flush=True) procs = int(math.ceil(w / TOTAL_NODES)) mem = int(TOTAL_MEM*0.9) print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", flush=True) spark = SparkSession\ .builder\ .appName(f'join {r} {w}')\ .master('spark://node:7077')\ .config('spark.executor.memory', f'{int(mem*0.6)}g')\ .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\ .config('spark.cores.max', w)\ .config('spark.driver.memory', '100g')\ .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\ .getOrCreate() sdf0 = spark.createDataFrame(df_l).repartition(w).cache() sdf1 = spark.createDataFrame(df_r).repartition(w).cache() print(f"data loaded to spark", flush=True) try: for i in range(it): t1 = time.time() out = sdf0.join(sdf1, on='col0', how='inner') count = out.count() t2 = time.time() print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", flush=True) del out del count gc.collect() finally: spark.stop() {code} {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with 48 cores and 240GB RAM each. I've spawned master and the driver code in node1, while other 14 nodes have spawned workers allocating maximum memory. In the spark context, I am reserving 90% of total memory to executor, splitting 60% to jvm and 40% to pyspark. {*}Issue{*}: When I run the above program, I can see that the executors are being assigned to the app. But it doesn't move forward, even after 60 mins. For smaller row count (10M), this was working without a problem. Driver output {code:java} world sz 256 procs per worker 19 mem 216 iter 8 Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below: Negative initial size: -589934400 Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. warn(msg) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14736) Deadlock in registering applications while the Master is in the RECOVERING mode
[ https://issues.apache.org/jira/browse/SPARK-14736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326203#comment-15326203 ] niranda perera commented on SPARK-14736: Hi guys, Any update on this? We are seeing this deadlock in our custom recovery mode impl quite often. Best > Deadlock in registering applications while the Master is in the RECOVERING > mode > --- > > Key: SPARK-14736 > URL: https://issues.apache.org/jira/browse/SPARK-14736 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.4.1, 1.5.0, 1.6.0 > Environment: unix, Spark cluster with a custom > StandaloneRecoveryModeFactory and a custom PersistenceEngine >Reporter: niranda perera >Priority: Critical > > I have encountered the following issue in the standalone recovery mode. > Let's say there was an application A running in the cluster. Due to some > issue, the entire cluster, together with the application A goes down. > Then later on, cluster comes back online, and the master then goes into the > 'recovering' mode, because it sees some apps, workers and drivers have > already been in the cluster from Persistence Engine. While in the recovery > process, the application comes back online, but now it would have a different > ID, let's say B. > But then, as per the master, application registration logic, this application > B will NOT be added to the 'waitingApps' with the message ""Attempted to > re-register application at same address". [1] > private def registerApplication(app: ApplicationInfo): Unit = { > val appAddress = app.driver.address > if (addressToApp.contains(appAddress)) { > logInfo("Attempted to re-register application at same address: " + > appAddress) > return > } > The problem here is, master is trying to recover application A, which is not > in there anymore. Therefore after the recovery process, app A will be > dropped. However app A's successor, app B was also omitted from the > 'waitingApps' list because it had the same address as App A previously. > This creates a deadlock in the cluster, app A nor app B is available in the > cluster. > When the master is in the RECOVERING mode, shouldn't it add all the > registering apps to a list first, and then after the recovery is completed > (once the unsuccessful recoveries are removed), deploy the apps which are new? > This would sort this deadlock IMO? > [1] > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L834 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-14736) Deadlock in registering applications while the Master is in the RECOVERING mode
[ https://issues.apache.org/jira/browse/SPARK-14736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] niranda perera updated SPARK-14736: --- Affects Version/s: 1.5.0 1.6.0 > Deadlock in registering applications while the Master is in the RECOVERING > mode > --- > > Key: SPARK-14736 > URL: https://issues.apache.org/jira/browse/SPARK-14736 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.4.1, 1.5.0, 1.6.0 > Environment: unix, Spark cluster with a custom > StandaloneRecoveryModeFactory and a custom PersistenceEngine >Reporter: niranda perera >Priority: Critical > > I have encountered the following issue in the standalone recovery mode. > Let's say there was an application A running in the cluster. Due to some > issue, the entire cluster, together with the application A goes down. > Then later on, cluster comes back online, and the master then goes into the > 'recovering' mode, because it sees some apps, workers and drivers have > already been in the cluster from Persistence Engine. While in the recovery > process, the application comes back online, but now it would have a different > ID, let's say B. > But then, as per the master, application registration logic, this application > B will NOT be added to the 'waitingApps' with the message ""Attempted to > re-register application at same address". [1] > private def registerApplication(app: ApplicationInfo): Unit = { > val appAddress = app.driver.address > if (addressToApp.contains(appAddress)) { > logInfo("Attempted to re-register application at same address: " + > appAddress) > return > } > The problem here is, master is trying to recover application A, which is not > in there anymore. Therefore after the recovery process, app A will be > dropped. However app A's successor, app B was also omitted from the > 'waitingApps' list because it had the same address as App A previously. > This creates a deadlock in the cluster, app A nor app B is available in the > cluster. > When the master is in the RECOVERING mode, shouldn't it add all the > registering apps to a list first, and then after the recovery is completed > (once the unsuccessful recoveries are removed), deploy the apps which are new? > This would sort this deadlock IMO? > [1] > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L834 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-14736) Deadlock in registering applications while the Master is in the RECOVERING mode
niranda perera created SPARK-14736: -- Summary: Deadlock in registering applications while the Master is in the RECOVERING mode Key: SPARK-14736 URL: https://issues.apache.org/jira/browse/SPARK-14736 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.4.1 Environment: unix, Spark cluster with a custom StandaloneRecoveryModeFactory and a custom PersistenceEngine Reporter: niranda perera Priority: Critical I have encountered the following issue in the standalone recovery mode. Let's say there was an application A running in the cluster. Due to some issue, the entire cluster, together with the application A goes down. Then later on, cluster comes back online, and the master then goes into the 'recovering' mode, because it sees some apps, workers and drivers have already been in the cluster from Persistence Engine. While in the recovery process, the application comes back online, but now it would have a different ID, let's say B. But then, as per the master, application registration logic, this application B will NOT be added to the 'waitingApps' with the message ""Attempted to re-register application at same address". [1] private def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.address if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } The problem here is, master is trying to recover application A, which is not in there anymore. Therefore after the recovery process, app A will be dropped. However app A's successor, app B was also omitted from the 'waitingApps' list because it had the same address as App A previously. This creates a deadlock in the cluster, app A nor app B is available in the cluster. When the master is in the RECOVERING mode, shouldn't it add all the registering apps to a list first, and then after the recovery is completed (once the unsuccessful recoveries are removed), deploy the apps which are new? This would sort this deadlock IMO? [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L834 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5264) Support `drop temporary table [if exists]` DDL command
[ https://issues.apache.org/jira/browse/SPARK-5264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15039713#comment-15039713 ] niranda perera commented on SPARK-5264: --- any update on this? > Support `drop temporary table [if exists]` DDL command > --- > > Key: SPARK-5264 > URL: https://issues.apache.org/jira/browse/SPARK-5264 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 1.3.0 >Reporter: Li Sheng >Priority: Minor > Original Estimate: 72h > Remaining Estimate: 72h > > Support `drop table` DDL command > i.e DROP [TEMPORARY] TABLE [IF EXISTS]tbl_name -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1830) Deploy failover, Make Persistence engine and LeaderAgent Pluggable.
[ https://issues.apache.org/jira/browse/SPARK-1830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14520918#comment-14520918 ] niranda perera commented on SPARK-1830: --- I'm trying to implement a custom persistence engine and a leader agent in the Java environment. vis-a-vis scala, when I implement the PersistenceEngine trait in java, I would have to implement methods such as readPersistedData, removeDriver, etc together with read, persist and unpersist methods. but the issue here is, methods such as readPersistedData etc are 'final def's, hence can not be overridden in the java environment. I am new to scala, but is there any workaround to implement the above traits in java? look forward to hear from you. Deploy failover, Make Persistence engine and LeaderAgent Pluggable. --- Key: SPARK-1830 URL: https://issues.apache.org/jira/browse/SPARK-1830 Project: Spark Issue Type: New Feature Components: Deploy Reporter: Prashant Sharma Assignee: Prashant Sharma Fix For: 1.3.0 With current code base it is difficult to plugin an external user specified Persistence Engine or Election Agent. It would be good to expose this as a pluggable API. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org