Error when cache partitioned Parquet table

2015-01-26 Thread ZHENG, Xu-dong
Hi all,

I meet below error when I cache a partitioned Parquet table. It seems that,
Spark is trying to extract the partitioned key in the Parquet file, so it
is not found. But other query could run successfully, even request the
partitioned key. Is it a bug in SparkSQL? Is there any workaround for it?
Thank you!

java.util.NoSuchElementException: key not found: querydate
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at 
org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142)
at 
org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$3.apply(ParquetTableOperations.scala:142)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:142)
at 
org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4.apply(ParquetTableOperations.scala:127)
at 
org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:247)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

-- 
郑旭东
ZHENG, Xu-dong


Re: spark sql - save to Parquet file - Unsupported datatype TimestampType

2014-12-08 Thread ZHENG, Xu-dong
I meet the same issue. Any solution?

On Wed, Nov 12, 2014 at 2:54 PM, tridib tridib.sama...@live.com wrote:

 Hi Friends,
 I am trying to save a json file to parquet. I got error Unsupported
 datatype TimestampType.
 Is not parquet support date? Which parquet version does spark uses? Is
 there
 any work around?


 Here the stacktrace:

 java.lang.RuntimeException: Unsupported datatype TimestampType
 at scala.sys.package$.error(package.scala:27)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:343)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:292)
 at scala.Option.getOrElse(Option.scala:120)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:291)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2$$anonfun$3.apply(ParquetTypes.scala:320)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2$$anonfun$3.apply(ParquetTypes.scala:320)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:319)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:292)
 at scala.Option.getOrElse(Option.scala:120)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:291)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:363)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:362)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:361)
 at

 org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:407)
 at

 org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:151)
 at

 org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:130)
 at

 org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:204)
 at

 org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at

 org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at

 org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
 at

 org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
 at

 org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
 at

 org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
 at

 org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
 at

 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at

 org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:76)
 at

 org.apache.spark.sql.api.java.JavaSchemaRDD.saveAsParquetFile(JavaSchemaRDD.scala:42)

 Thanks  Regards
 Tridib




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-save-to-Parquet-file-Unsupported-datatype-TimestampType-tp18691.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
郑旭东
ZHENG, Xu-dong


Re: Is there any way to control the parallelism in LogisticRegression

2014-08-21 Thread ZHENG, Xu-dong
Update.

I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which sounds
could control the locality. The default value is 0.1 (smaller value means
lower locality). I change it to 1.0 (full locality) and use #3 approach,
then find a lot improvement (20%~40%). Although the Web UI still shows the
type of task as 'ANY' and the input is from shuffle read, but the real
performance is much better before change this parameter.
[image: Inline image 1]

I think the benefit includes:

1. This approach keep the physical partition size small, but make each task
handle multiple partitions. So the memory requested for deserialization is
reduced, which also reduce the GC time. That is exactly what we observed in
our job.

2. This approach will not hit the 2G limitation, because it not change the
partition size.

And I also think that, Spark may change this default value, or at least
expose this parameter to users (*CoalescedRDD *is a private class, and *RDD*
.*coalesce* also don't have a parameter to control that).

On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng men...@gmail.com wrote:

 Sorry, I missed #2. My suggestion is the same as #2. You need to set a
 bigger numPartitions to avoid hitting integer bound or 2G limitation,
 at the cost of increased shuffle size per iteration. If you use a
 CombineInputFormat and then cache, it will try to give you roughly the
 same size per partition. There will be some remote fetches from HDFS
 but still cheaper than calling RDD.repartition().

 For coalesce without shuffle, I don't know how to set the right number
 of partitions either ...

 -Xiangrui

 On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong dong...@gmail.com wrote:
  Hi Xiangrui,
 
  Thanks for your reply!
 
  Yes, our data is very sparse, but RDD.repartition invoke
  RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has
  the same effect with #2, right?
 
  For CombineInputFormat, although I haven't tried it, but it sounds that
 it
  will combine multiple partitions into a large partition if I cache it, so
  same issues as #1?
 
  For coalesce, could you share some best practice how to set the right
 number
  of partitions to avoid locality problem?
 
  Thanks!
 
 
 
  On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote:
 
  Assuming that your data is very sparse, I would recommend
  RDD.repartition. But if it is not the case and you don't want to
  shuffle the data, you can try a CombineInputFormat and then parse the
  lines into labeled points. Coalesce may cause locality problems if you
  didn't use the right number of partitions. -Xiangrui
 
  On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com
  wrote:
   I think this has the same effect and issue with #1, right?
  
  
   On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen 
 chenjiush...@gmail.com
   wrote:
  
   How about increase HDFS file extent size? like current value is 128M,
   we
   make it 512M or bigger.
  
  
   On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com
   wrote:
  
   Hi all,
  
   We are trying to use Spark MLlib to train super large data (100M
   features
   and 5B rows). The input data in HDFS has ~26K partitions. By
 default,
   MLlib
   will create a task for every partition at each iteration. But
 because
   our
   dimensions are also very high, such large number of tasks will
   increase
   large network overhead to transfer the weight vector. So we want to
   reduce
   the number of tasks, we tried below ways:
  
   1. Coalesce partitions without shuffling, then cache.
  
   data.coalesce(numPartitions).cache()
  
   This works fine for relative small data, but when data is increasing
   and
   numPartitions is fixed, the size of one partition will be large.
 This
   introduces two issues: the first is, the larger partition will need
   larger
   object and more memory at runtime, and trigger GC more frequently;
 the
   second is, we meet the issue 'size exceeds integer.max_value' error,
   which
   seems be caused by the size of one partition larger than 2G
   (https://issues.apache.org/jira/browse/SPARK-1391).
  
   2. Coalesce partitions with shuffling, then cache.
  
   data.coalesce(numPartitions, true).cache()
  
   It could mitigate the second issue in #1 at some degree, but fist
   issue
   is still there, and it also will introduce large amount of
 shullfling.
  
   3. Cache data first, and coalesce partitions.
  
   data.cache().coalesce(numPartitions)
  
   In this way, the number of cached partitions is not change, but each
   task
   read the data from multiple partitions. However, I find the task
 will
   loss
   locality by this way. I find a lot of 'ANY' tasks, that means that
   tasks
   read data from other nodes, and become slower than that read data
 from
   local
   memory.
  
   I think the best way should like #3, but leverage locality as more
 as
   possible. Is there any way to do that? Any suggestions?
  
   Thanks!
  
   --
   ZHENG

Re: Is there any way to control the parallelism in LogisticRegression

2014-08-12 Thread ZHENG, Xu-dong
Hi Xiangrui,

Thanks for your reply!

Yes, our data is very sparse, but RDD.repartition invoke
RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has
the same effect with #2, right?

For CombineInputFormat, although I haven't tried it, but it sounds that it
will combine multiple partitions into a large partition if I cache it, so
same issues as #1?

For coalesce, could you share some best practice how to set the right
number of partitions to avoid locality problem?

Thanks!



On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote:

 Assuming that your data is very sparse, I would recommend
 RDD.repartition. But if it is not the case and you don't want to
 shuffle the data, you can try a CombineInputFormat and then parse the
 lines into labeled points. Coalesce may cause locality problems if you
 didn't use the right number of partitions. -Xiangrui

 On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com
 wrote:
  I think this has the same effect and issue with #1, right?
 
 
  On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com
  wrote:
 
  How about increase HDFS file extent size? like current value is 128M, we
  make it 512M or bigger.
 
 
  On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com
  wrote:
 
  Hi all,
 
  We are trying to use Spark MLlib to train super large data (100M
 features
  and 5B rows). The input data in HDFS has ~26K partitions. By default,
 MLlib
  will create a task for every partition at each iteration. But because
 our
  dimensions are also very high, such large number of tasks will increase
  large network overhead to transfer the weight vector. So we want to
 reduce
  the number of tasks, we tried below ways:
 
  1. Coalesce partitions without shuffling, then cache.
 
  data.coalesce(numPartitions).cache()
 
  This works fine for relative small data, but when data is increasing
 and
  numPartitions is fixed, the size of one partition will be large. This
  introduces two issues: the first is, the larger partition will need
 larger
  object and more memory at runtime, and trigger GC more frequently; the
  second is, we meet the issue 'size exceeds integer.max_value' error,
 which
  seems be caused by the size of one partition larger than 2G
  (https://issues.apache.org/jira/browse/SPARK-1391).
 
  2. Coalesce partitions with shuffling, then cache.
 
  data.coalesce(numPartitions, true).cache()
 
  It could mitigate the second issue in #1 at some degree, but fist issue
  is still there, and it also will introduce large amount of shullfling.
 
  3. Cache data first, and coalesce partitions.
 
  data.cache().coalesce(numPartitions)
 
  In this way, the number of cached partitions is not change, but each
 task
  read the data from multiple partitions. However, I find the task will
 loss
  locality by this way. I find a lot of 'ANY' tasks, that means that
 tasks
  read data from other nodes, and become slower than that read data from
 local
  memory.
 
  I think the best way should like #3, but leverage locality as more as
  possible. Is there any way to do that? Any suggestions?
 
  Thanks!
 
  --
  ZHENG, Xu-dong
 
 
 
 
 
  --
  郑旭东
  ZHENG, Xu-dong
 




-- 
郑旭东
ZHENG, Xu-dong


Re: Spark SQL JDBC

2014-08-12 Thread ZHENG, Xu-dong
 compiled correctly, I can access
 data in spark-shell on yarn from my hive installation. Cached tables, etc
 all work.

 When I execute ./sbin/start-thriftserver.sh

 I get the error below. Shouldn't it just ready my spark-env? I guess I am
 lost on how to make this work.

 Thanks1

 $ ./start-thriftserver.sh


 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 Exception in thread main java.lang.ClassNotFoundException:
 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

 at java.lang.Class.forName0(Native Method)

 at java.lang.Class.forName(Class.java:270)

 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:311)

 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)

 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)





-- 
郑旭东
ZHENG, Xu-dong


Re: Spark SQL JDBC

2014-08-12 Thread ZHENG, Xu-dong
Just find this is because below lines in make_distribution.sh doesn't work:

if [ $SPARK_HIVE == true ]; then
  cp $FWDIR/lib_managed/jars/datanucleus*.jar $DISTDIR/lib/
fi

There is no definition of $SPARK_HIVE in make_distribution.sh. I should set
it explicitly.



On Wed, Aug 13, 2014 at 1:10 PM, ZHENG, Xu-dong dong...@gmail.com wrote:

 Hi Cheng,

 I also meet some issues when I try to start ThriftServer based a build
 from master branch (I could successfully run it from the branch-1.0-jdbc
 branch). Below is my build command:

 ./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn
 -Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver

 And below are the printed errors:

 ERROR CompositeService: Error starting services HiveServer2
 org.apache.hive.service.ServiceException: Unable to connect to MetaStore!
 at org.apache.hive.service.cli.CLIService.start(CLIService.java:85)
 at
 org.apache.hive.service.CompositeService.start(CompositeService.java:70)
 at
 org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73)
 at
 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71)
 at
 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314)
  at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: javax.jdo.JDOFatalUserException: Class
 org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
 NestedThrowables:
 java.lang.ClassNotFoundException:
 org.datanucleus.api.jdo.JDOPersistenceManagerFactory
 at
 javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)
 at
 javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
 at
 javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
 at
 org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275)
 at
 org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304)
 at
 org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234)
 at
 org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209)
 at
 org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
 at
 org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
 at
 org.apache.hadoop.hive.metastore.RetryingRawStore.init(RetryingRawStore.java:64)
 at
 org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:286)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:121)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:104)
 at org.apache.hive.service.cli.CLIService.start(CLIService.java:82)
 ... 11 more
 Caused by: java.lang.ClassNotFoundException:
 org.datanucleus.api.jdo.JDOPersistenceManagerFactory
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018)
 at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016)
 at java.security.AccessController.doPrivileged(Native Method

Is there any way to control the parallelism in LogisticRegression

2014-08-11 Thread ZHENG, Xu-dong
Hi all,

We are trying to use Spark MLlib to train super large data (100M features
and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib
will create a task for every partition at each iteration. But because our
dimensions are also very high, such large number of tasks will increase
large network overhead to transfer the weight vector. So we want to reduce
the number of tasks, we tried below ways:

1. Coalesce partitions without shuffling, then cache.

data.coalesce(numPartitions).cache()

This works fine for relative small data, but when data is increasing and
numPartitions is fixed, the size of one partition will be large. This
introduces two issues: the first is, the larger partition will need larger
object and more memory at runtime, and trigger GC more frequently; the
second is, we meet the issue 'size exceeds integer.max_value' error, which
seems be caused by the size of one partition larger than 2G (
https://issues.apache.org/jira/browse/SPARK-1391).

2. Coalesce partitions with shuffling, then cache.

data.coalesce(numPartitions, true).cache()

It could mitigate the second issue in #1 at some degree, but fist issue is
still there, and it also will introduce large amount of shullfling.

3. Cache data first, and coalesce partitions.

data.cache().coalesce(numPartitions)

In this way, the number of cached partitions is not change, but each task
read the data from multiple partitions. However, I find the task will loss
locality by this way. I find a lot of 'ANY' tasks, that means that tasks
read data from other nodes, and become slower than that read data from
local memory.

I think the best way should like #3, but leverage locality as more as
possible. Is there any way to do that? Any suggestions?

Thanks!

-- 
ZHENG, Xu-dong


Re: Is there any way to control the parallelism in LogisticRegression

2014-08-11 Thread ZHENG, Xu-dong
I think this has the same effect and issue with #1, right?


On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com
wrote:

 How about increase HDFS file extent size? like current value is 128M, we
 make it 512M or bigger.


 On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com
 wrote:

 Hi all,

 We are trying to use Spark MLlib to train super large data (100M features
 and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib
 will create a task for every partition at each iteration. But because our
 dimensions are also very high, such large number of tasks will increase
 large network overhead to transfer the weight vector. So we want to reduce
 the number of tasks, we tried below ways:

 1. Coalesce partitions without shuffling, then cache.

 data.coalesce(numPartitions).cache()

 This works fine for relative small data, but when data is increasing and
 numPartitions is fixed, the size of one partition will be large. This
 introduces two issues: the first is, the larger partition will need larger
 object and more memory at runtime, and trigger GC more frequently; the
 second is, we meet the issue 'size exceeds integer.max_value' error, which
 seems be caused by the size of one partition larger than 2G (
 https://issues.apache.org/jira/browse/SPARK-1391).

 2. Coalesce partitions with shuffling, then cache.

 data.coalesce(numPartitions, true).cache()

 It could mitigate the second issue in #1 at some degree, but fist issue
 is still there, and it also will introduce large amount of shullfling.

 3. Cache data first, and coalesce partitions.

 data.cache().coalesce(numPartitions)

 In this way, the number of cached partitions is not change, but each task
 read the data from multiple partitions. However, I find the task will loss
 locality by this way. I find a lot of 'ANY' tasks, that means that tasks
 read data from other nodes, and become slower than that read data from
 local memory.

 I think the best way should like #3, but leverage locality as more as
 possible. Is there any way to do that? Any suggestions?

 Thanks!

 --
 ZHENG, Xu-dong





-- 
郑旭东
ZHENG, Xu-dong