Error when cache partitioned Parquet table
Hi all, I meet below error when I cache a partitioned Parquet table. It seems that, Spark is trying to extract the partitioned key in the Parquet file, so it is not found. But other query could run successfully, even request the partitioned key. Is it a bug in SparkSQL? Is there any workaround for it? Thank you! java.util.NoSuchElementException: key not found: querydate at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:142) at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:127) at org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:247) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) -- 郑旭东 ZHENG, Xu-dong
Re: spark sql - save to Parquet file - Unsupported datatype TimestampType
I meet the same issue. Any solution? On Wed, Nov 12, 2014 at 2:54 PM, tridib tridib.sama...@live.com wrote: Hi Friends, I am trying to save a json file to parquet. I got error Unsupported datatype TimestampType. Is not parquet support date? Which parquet version does spark uses? Is there any work around? Here the stacktrace: java.lang.RuntimeException: Unsupported datatype TimestampType at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:343) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:292) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:291) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2$$anonfun$3.apply(ParquetTypes.scala:320) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2$$anonfun$3.apply(ParquetTypes.scala:320) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:319) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:292) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:291) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:363) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:362) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:361) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:407) at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:151) at org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:130) at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:204) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:76) at org.apache.spark.sql.api.java.JavaSchemaRDD.saveAsParquetFile(JavaSchemaRDD.scala:42) Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-save-to-Parquet-file-Unsupported-datatype-TimestampType-tp18691.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- 郑旭东 ZHENG, Xu-dong
Re: Is there any way to control the parallelism in LogisticRegression
Update. I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which sounds could control the locality. The default value is 0.1 (smaller value means lower locality). I change it to 1.0 (full locality) and use #3 approach, then find a lot improvement (20%~40%). Although the Web UI still shows the type of task as 'ANY' and the input is from shuffle read, but the real performance is much better before change this parameter. [image: Inline image 1] I think the benefit includes: 1. This approach keep the physical partition size small, but make each task handle multiple partitions. So the memory requested for deserialization is reduced, which also reduce the GC time. That is exactly what we observed in our job. 2. This approach will not hit the 2G limitation, because it not change the partition size. And I also think that, Spark may change this default value, or at least expose this parameter to users (*CoalescedRDD *is a private class, and *RDD* .*coalesce* also don't have a parameter to control that). On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng men...@gmail.com wrote: Sorry, I missed #2. My suggestion is the same as #2. You need to set a bigger numPartitions to avoid hitting integer bound or 2G limitation, at the cost of increased shuffle size per iteration. If you use a CombineInputFormat and then cache, it will try to give you roughly the same size per partition. There will be some remote fetches from HDFS but still cheaper than calling RDD.repartition(). For coalesce without shuffle, I don't know how to set the right number of partitions either ... -Xiangrui On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi Xiangrui, Thanks for your reply! Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote: Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote: I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G (https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache. data.coalesce(numPartitions, true).cache() It could mitigate the second issue in #1 at some degree, but fist issue is still there, and it also will introduce large amount of shullfling. 3. Cache data first, and coalesce partitions. data.cache().coalesce(numPartitions) In this way, the number of cached partitions is not change, but each task read the data from multiple partitions. However, I find the task will loss locality by this way. I find a lot of 'ANY' tasks, that means that tasks read data from other nodes, and become slower than that read data from local memory. I think the best way should like #3, but leverage locality as more as possible. Is there any way to do that? Any suggestions? Thanks! -- ZHENG
Re: Is there any way to control the parallelism in LogisticRegression
Hi Xiangrui, Thanks for your reply! Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote: Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote: I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G (https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache. data.coalesce(numPartitions, true).cache() It could mitigate the second issue in #1 at some degree, but fist issue is still there, and it also will introduce large amount of shullfling. 3. Cache data first, and coalesce partitions. data.cache().coalesce(numPartitions) In this way, the number of cached partitions is not change, but each task read the data from multiple partitions. However, I find the task will loss locality by this way. I find a lot of 'ANY' tasks, that means that tasks read data from other nodes, and become slower than that read data from local memory. I think the best way should like #3, but leverage locality as more as possible. Is there any way to do that? Any suggestions? Thanks! -- ZHENG, Xu-dong -- 郑旭东 ZHENG, Xu-dong -- 郑旭东 ZHENG, Xu-dong
Re: Spark SQL JDBC
compiled correctly, I can access data in spark-shell on yarn from my hive installation. Cached tables, etc all work. When I execute ./sbin/start-thriftserver.sh I get the error below. Shouldn't it just ready my spark-env? I guess I am lost on how to make this work. Thanks1 $ ./start-thriftserver.sh Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:311) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- 郑旭东 ZHENG, Xu-dong
Re: Spark SQL JDBC
Just find this is because below lines in make_distribution.sh doesn't work: if [ $SPARK_HIVE == true ]; then cp $FWDIR/lib_managed/jars/datanucleus*.jar $DISTDIR/lib/ fi There is no definition of $SPARK_HIVE in make_distribution.sh. I should set it explicitly. On Wed, Aug 13, 2014 at 1:10 PM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi Cheng, I also meet some issues when I try to start ThriftServer based a build from master branch (I could successfully run it from the branch-1.0-jdbc branch). Below is my build command: ./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn -Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver And below are the printed errors: ERROR CompositeService: Error starting services HiveServer2 org.apache.hive.service.ServiceException: Unable to connect to MetaStore! at org.apache.hive.service.cli.CLIService.start(CLIService.java:85) at org.apache.hive.service.CompositeService.start(CompositeService.java:70) at org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RetryingRawStore.init(RetryingRawStore.java:64) at org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:286) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59) at org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:121) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:104) at org.apache.hive.service.cli.CLIService.start(CLIService.java:82) ... 11 more Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018) at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016) at java.security.AccessController.doPrivileged(Native Method
Is there any way to control the parallelism in LogisticRegression
Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G ( https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache. data.coalesce(numPartitions, true).cache() It could mitigate the second issue in #1 at some degree, but fist issue is still there, and it also will introduce large amount of shullfling. 3. Cache data first, and coalesce partitions. data.cache().coalesce(numPartitions) In this way, the number of cached partitions is not change, but each task read the data from multiple partitions. However, I find the task will loss locality by this way. I find a lot of 'ANY' tasks, that means that tasks read data from other nodes, and become slower than that read data from local memory. I think the best way should like #3, but leverage locality as more as possible. Is there any way to do that? Any suggestions? Thanks! -- ZHENG, Xu-dong
Re: Is there any way to control the parallelism in LogisticRegression
I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G ( https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache. data.coalesce(numPartitions, true).cache() It could mitigate the second issue in #1 at some degree, but fist issue is still there, and it also will introduce large amount of shullfling. 3. Cache data first, and coalesce partitions. data.cache().coalesce(numPartitions) In this way, the number of cached partitions is not change, but each task read the data from multiple partitions. However, I find the task will loss locality by this way. I find a lot of 'ANY' tasks, that means that tasks read data from other nodes, and become slower than that read data from local memory. I think the best way should like #3, but leverage locality as more as possible. Is there any way to do that? Any suggestions? Thanks! -- ZHENG, Xu-dong -- 郑旭东 ZHENG, Xu-dong