Is there a way to clone a JavaRDD without persisting it

2014-11-11 Thread Steve Lewis
 In my problem I have a number of intermediate JavaRDDs and would like to
be able to look at their sizes without destroying the RDD for sibsequent
processing. persist will do this but these are big and perisist seems
expensive and I am unsure of which StorageLevel is needed, Is there a way
to clone a JavaRDD or does anyong have good ideas on how to do this?


Re: Pyspark Error when broadcast numpy array

2014-11-11 Thread Davies Liu
Yes, your broadcast should be about 300M, much smaller than 2G, I
didn't read your post carefully.

The broadcast in Python had been improved much since 1.1, I think it
will work in 1.1 or upcoming 1.2 release, could you upgrade to 1.1?

Davies

On Tue, Nov 11, 2014 at 8:37 PM, bliuab bli...@cse.ust.hk wrote:
 Dear Liu:

 Thank you very much for your help. I will update that patch. By the way, as
 I have succeed to broadcast an array of size(30M) the log said that such
 array takes around 230MB memory. As a result, I think the numpy array that
 leads to error is much smaller than 2G.

 On Wed, Nov 12, 2014 at 12:29 PM, Davies Liu-2 [via Apache Spark User List]
 [hidden email] wrote:

 This PR fix the problem: https://github.com/apache/spark/pull/2659

 cc @josh

 Davies

 On Tue, Nov 11, 2014 at 7:47 PM, bliuab [hidden email] wrote:

  In spark-1.0.2, I have come across an error when I try to broadcast a
  quite
  large numpy array(with 35M dimension). The error information except the
  java.lang.NegativeArraySizeException error and details is listed below.
  Moreover, when broadcast a relatively smaller numpy array(30M
  dimension),
  everything works fine. And 30M dimension numpy array takes 230M memory
  which, in my opinion, not very large.
  As far as I have surveyed, it seems related with py4j. However, I have
  no
  idea how to fix  this. I would be appreciated if I can get some hint.
  
  py4j.protocol.Py4JError: An error occurred while calling o23.broadcast.
  Trace:
  java.lang.NegativeArraySizeException
  at py4j.Base64.decode(Base64.java:292)
  at py4j.Protocol.getBytes(Protocol.java:167)
  at py4j.Protocol.getObject(Protocol.java:276)
  at
  py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
  at py4j.commands.CallCommand.execute(CallCommand.java:77)
  at py4j.GatewayConnection.run(GatewayConnection.java:207)
  -
  And the test code is a follows:
  conf =
 
  SparkConf().setAppName('brodyliu_LR').setMaster('spark://10.231.131.87:5051')
  conf.set('spark.executor.memory', '4000m')
  conf.set('spark.akka.timeout', '10')
  conf.set('spark.ui.port','8081')
  conf.set('spark.cores.max','150')
  #conf.set('spark.rdd.compress', 'True')
  conf.set('spark.default.parallelism', '300')
  #configure the spark environment
  sc = SparkContext(conf=conf, batchSize=1)
 
  vec = np.random.rand(3500)
  a = sc.broadcast(vec)
 
 
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: [hidden email]
  For additional commands, e-mail: [hidden email]
 

 -
 To unsubscribe, e-mail: [hidden email]
 For additional commands, e-mail: [hidden email]



 
 If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662p18673.html
 To unsubscribe from Pyspark Error when broadcast numpy array, click here.
 NAML




 --
 My Homepage: www.cse.ust.hk/~bliuab
 MPhil student in Hong Kong University of Science and Technology.
 Clear Water Bay, Kowloon, Hong Kong.
 Profile at LinkedIn.

 
 View this message in context: Re: Pyspark Error when broadcast numpy array

 Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How did the RDD.union work

2014-11-11 Thread qiaou
Hi:  
I got a problem with using the union method of RDD
things like this
I get a function like
def hbaseQuery(area:string):RDD[Result]= ???
when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns 0
however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList ::: 
hbaseQuery(’bb’).collect.toList) it return the right value  
obviously i have got an action after my transformation action ,but why it did 
not work
fyi

--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)



Re: MLLIB usage: BLAS dependency warning

2014-11-11 Thread Xiangrui Meng
Could you try jar tf on the assembly jar and grep
netlib-native_system-linux-x86_64.so? -Xiangrui

On Tue, Nov 11, 2014 at 7:11 PM, jpl jlefe...@soe.ucsc.edu wrote:
 Hi,
 I am having trouble using the BLAS libs with the MLLib functions.  I am
 using org.apache.spark.mllib.clustering.KMeans (on a single machine) and
 running the Spark-shell with the kmeans example code (from
 https://spark.apache.org/docs/latest/mllib-clustering.html)  which runs
 successfully but I get the following warning in the log:

 WARN netlib.BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS
 WARN netlib.BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS

 I compiled spark 1.1.0 with mvn -Phadoop-2.4  -Dhadoop.version=2.4.0
 -Pnetlib-lgpl -DskipTests clean package

 If anyone could please clarify the steps to get the dependencies correctly
 installed and visible to spark (from
 https://spark.apache.org/docs/latest/mllib-guide.html), that would be
 greatly appreciated.  Using yum, I installed blas.x86_64, lapack.x86_64,
 gcc-gfortran.x86_64, libgfortran.x86_64 and then downloaded Breeze and built
 that successfully with Maven.  I verified that I do have
 /usr/lib/libblas.so.3 and /usr/lib/liblapack.so.3 present on the machine and
 ldconf -p shows these listed.

 I also tried adding /usr/lib/ to spark.executor.extraLibraryPath and I
 verified it is present in the Spark webUI environment tab.   I downloaded
 and compiled jblas with mvn clean install, which creates
 jblas-1.2.4-SNAPSHOT.jar, and then also tried adding that to
 spark.executor.extraClassPath but I still get the same WARN message. Maybe
 there are a few simple steps that I am missing?  Thanks a lot.




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-usage-BLAS-dependency-warning-tp18660.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-shell exception while running in YARN mode

2014-11-11 Thread hmxxyy
The Pi example gives same error in yarn mode

HADOOP_CONF_DIR=/home/gs/conf/current ./spark-submit --class
org.apache.spark.examples.SparkPi --master yarn-client
../examples/target/spark-examples_2.10-1.2.0-SNAPSHOT.jar

What could be wrong here?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-exception-while-running-in-YARN-mode-tp18679p18688.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How did the RDD.union work

2014-11-11 Thread Shixiong Zhu
Could you provide the code of hbaseQuery? It maybe doesn't support to
execute in parallel.

Best Regards,
Shixiong Zhu

2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com:

  Hi:
 I got a problem with using the union method of RDD
 things like this
 I get a function like
 def hbaseQuery(area:string):RDD[Result]= ???
 when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
 returns 0
 however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList
 ::: hbaseQuery(’bb’).collect.toList).count() it return the right value
 obviously i have got an action after my transformation action ,but why it
 did not work
 fyi

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig




Re: Imbalanced shuffle read

2014-11-11 Thread Akhil Das
When you calls the groupByKey() try providing the number of partitions like
groupByKey(100) depending on your data/cluster size.

Thanks
Best Regards

On Wed, Nov 12, 2014 at 6:45 AM, ankits ankitso...@gmail.com wrote:

 Im running a job that uses groupByKey(), so it generates a lot of shuffle
 data. Then it processes this and writes files to HDFS in a forEachPartition
 block. Looking at the forEachPartition stage details in the web console,
 all
 but one executor is idle (SUCCESS in 50-60ms), and one is RUNNING with a
 huge shuffle read and takes a long time to finish.

 Can someone explain why the read is all on one node and how to parallelize
 this better?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Imbalanced-shuffle-read-tp18648.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




回复: How did the RDD.union work

2014-11-11 Thread qiaou
ok here is the code

def hbaseQuery:(String)=RDD[Result] = {
  val generateRdd = (area:String)={
val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 
10)}
val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 
10)}
println(sstartRowKey:${startRowKey})
println(sstopRowKey :${stopRowKey})

val scan = new Scan()
scan.setStartRow(Bytes.toBytes(startRowKey))
scan.setStopRow(Bytes.toBytes(stopRowKey))
val filterList: FilterList = new FilterList()
if (appKey != null  !appKey.equals(_)) {
  val appKeyFilter: SingleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes(clientInfo), 
Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey))
  filterList.addFilter(appKeyFilter)
}
if (imei != null  !imei.equals(_)) {
  val imeiFilter: SingleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes(clientInfo), 
Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei))
  filterList.addFilter(imeiFilter)
}
if (filterList.getFilters != null  filterList.getFilters.size()  0) {
  scan.setFilter(filterList)
}
scan.setCaching(1)

val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack)
hbaseConf.set(TableInputFormat.SCAN, 
Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

SparkUtil.getSingleSparkContext()
  .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]).map {
  case (_: ImmutableBytesWritable, result: Result) = {
result
  }
}
  }
  return generateRdd
}


--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)


在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:

 Could you provide the code of hbaseQuery? It maybe doesn't support to execute 
 in parallel.
  
 Best Regards,
 Shixiong Zhu
  
  
  
  
 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com 
 (mailto:qiaou8...@gmail.com):
  Hi:  
  I got a problem with using the union method of RDD
  things like this
  I get a function like
  def hbaseQuery(area:string):RDD[Result]= ???
  when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it 
  returns 0
  however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList 
  ::: hbaseQuery(’bb’).collect.toList).count() it return the right value  
  obviously i have got an action after my transformation action ,but why it 
  did not work
  fyi
   
  --  
  qiaou
  已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
   
  



spark sql - save to Parquet file - Unsupported datatype TimestampType

2014-11-11 Thread tridib
Hi Friends,
I am trying to save a json file to parquet. I got error Unsupported
datatype TimestampType. 
Is not parquet support date? Which parquet version does spark uses? Is there
any work around?


Here the stacktrace:

java.lang.RuntimeException: Unsupported datatype TimestampType
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:343)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:292)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:291)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2$$anonfun$3.apply(ParquetTypes.scala:320)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2$$anonfun$3.apply(ParquetTypes.scala:320)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:319)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:292)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:291)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:363)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:362)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:361)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:407)
at
org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:151)
at
org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:130)
at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:204)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at
org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:76)
at
org.apache.spark.sql.api.java.JavaSchemaRDD.saveAsParquetFile(JavaSchemaRDD.scala:42)

Thanks  Regards
Tridib




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-save-to-Parquet-file-Unsupported-datatype-TimestampType-tp18691.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupBy for DStream

2014-11-11 Thread Akhil Das
1. Use foreachRDD over the dstream and on the each rdd you can call the
groupBy()

2. DStream.count() Return a new DStream in which each RDD has a single
element generated by counting each RDD of this DStream.

Thanks
Best Regards

On Wed, Nov 12, 2014 at 2:49 AM, SK skrishna...@gmail.com wrote:


 Hi.

 1) I dont see a groupBy() method for a DStream object. Not sure why that is
 not supported. Currently I am using filter () to separate out the different
 groups. I would like to know if there is a way to convert a DStream object
 to a regular RDD so that I can apply the RDD methods like groupBy.


 2) The count() method for a DStream object returns a DStream[Long] instead
 of a simple Long (like RDD does). How can I extract the simple Long count
 value? I tried dstream(0) but got a compilation error that it does not take
 parameters. I also tried dstream[0], but that also resulted in a
 compilation
 error. I am not able to use the head() or take(0) method for DStream
 either.

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-for-DStream-tp18623.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Pyspark Error when broadcast numpy array

2014-11-11 Thread bliuab
Dear Liu:

Thank you for your replay. I will set up an experimental environment for
spark-1.1 and test it.

On Wed, Nov 12, 2014 at 2:30 PM, Davies Liu-2 [via Apache Spark User List] 
ml-node+s1001560n1868...@n3.nabble.com wrote:

 Yes, your broadcast should be about 300M, much smaller than 2G, I
 didn't read your post carefully.

 The broadcast in Python had been improved much since 1.1, I think it
 will work in 1.1 or upcoming 1.2 release, could you upgrade to 1.1?

 Davies

 On Tue, Nov 11, 2014 at 8:37 PM, bliuab [hidden email]
 http://user/SendEmail.jtp?type=nodenode=18684i=0 wrote:

  Dear Liu:
 
  Thank you very much for your help. I will update that patch. By the way,
 as
  I have succeed to broadcast an array of size(30M) the log said that such
  array takes around 230MB memory. As a result, I think the numpy array
 that
  leads to error is much smaller than 2G.
 
  On Wed, Nov 12, 2014 at 12:29 PM, Davies Liu-2 [via Apache Spark User
 List]
  [hidden email] wrote:
 
  This PR fix the problem: https://github.com/apache/spark/pull/2659
 
  cc @josh
 
  Davies
 
  On Tue, Nov 11, 2014 at 7:47 PM, bliuab [hidden email] wrote:
 
   In spark-1.0.2, I have come across an error when I try to broadcast a
   quite
   large numpy array(with 35M dimension). The error information except
 the
   java.lang.NegativeArraySizeException error and details is listed
 below.
   Moreover, when broadcast a relatively smaller numpy array(30M
   dimension),
   everything works fine. And 30M dimension numpy array takes 230M
 memory
   which, in my opinion, not very large.
   As far as I have surveyed, it seems related with py4j. However, I
 have
   no
   idea how to fix  this. I would be appreciated if I can get some hint.
   
   py4j.protocol.Py4JError: An error occurred while calling
 o23.broadcast.
   Trace:
   java.lang.NegativeArraySizeException
   at py4j.Base64.decode(Base64.java:292)
   at py4j.Protocol.getBytes(Protocol.java:167)
   at py4j.Protocol.getObject(Protocol.java:276)
   at
   py4j.commands.AbstractCommand.getArguments(AbstractCommand.java:81)
   at py4j.commands.CallCommand.execute(CallCommand.java:77)
   at py4j.GatewayConnection.run(GatewayConnection.java:207)
   -
   And the test code is a follows:
   conf =
  
   SparkConf().setAppName('brodyliu_LR').setMaster('spark://
 10.231.131.87:5051')
   conf.set('spark.executor.memory', '4000m')
   conf.set('spark.akka.timeout', '10')
   conf.set('spark.ui.port','8081')
   conf.set('spark.cores.max','150')
   #conf.set('spark.rdd.compress', 'True')
   conf.set('spark.default.parallelism', '300')
   #configure the spark environment
   sc = SparkContext(conf=conf, batchSize=1)
  
   vec = np.random.rand(3500)
   a = sc.broadcast(vec)
  
  
  
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: [hidden email]
   For additional commands, e-mail: [hidden email]
  
 
  -
  To unsubscribe, e-mail: [hidden email]
  For additional commands, e-mail: [hidden email]
 
 
 
  
  If you reply to this email, your message will be added to the
 discussion
  below:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662p18673.html
  To unsubscribe from Pyspark Error when broadcast numpy array, click
 here.
  NAML
 
 
 
 
  --
  My Homepage: www.cse.ust.hk/~bliuab
  MPhil student in Hong Kong University of Science and Technology.
  Clear Water Bay, Kowloon, Hong Kong.
  Profile at LinkedIn.
 
  
  View this message in context: Re: Pyspark Error when broadcast numpy
 array
 
  Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=18684i=1
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=18684i=2



 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-Error-when-broadcast-numpy-array-tp18662p18684.html
  To unsubscribe from Pyspark Error when broadcast numpy array, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=18662code=YmxpdWFiQGNzZS51c3QuaGt8MTg2NjJ8NTUwMDMxMjYz
 .
 NAML
 

回复: How did the RDD.union work

2014-11-11 Thread qiaou
this work!  
but can you explain why should use like this?

--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)


在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道:

 You need to create a new configuration for each RDD. Therefore, val 
 hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val 
 hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)
  
 Best Regards,
 Shixiong Zhu
  
  
  
  
 2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com 
 (mailto:qiaou8...@gmail.com):
  ok here is the code
   
  def hbaseQuery:(String)=RDD[Result] = {
val generateRdd = (area:String)={
  val startRowKey = s$area${RowKeyUtils.convertToHex(startId, 
  10)}
  val stopRowKey = s$area${RowKeyUtils.convertToHex(endId, 
  10)}
  println(sstartRowKey:${startRowKey})
  println(sstopRowKey :${stopRowKey})
   
  val scan = new Scan()
  scan.setStartRow(Bytes.toBytes(startRowKey))
  scan.setStopRow(Bytes.toBytes(stopRowKey))
  val filterList: FilterList = new FilterList()
  if (appKey != null  !appKey.equals(_)) {
val appKeyFilter: SingleColumnValueFilter =
  new SingleColumnValueFilter(Bytes.toBytes(clientInfo), 
  Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey))
filterList.addFilter(appKeyFilter)
  }
  if (imei != null  !imei.equals(_)) {
val imeiFilter: SingleColumnValueFilter =
  new SingleColumnValueFilter(Bytes.toBytes(clientInfo), 
  Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei))
filterList.addFilter(imeiFilter)
  }
  if (filterList.getFilters != null  filterList.getFilters.size()  
  0) {
scan.setFilter(filterList)
  }
  scan.setCaching(1)
   
  val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
  hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack)
  hbaseConf.set(TableInputFormat.SCAN, 
  Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
   
  SparkUtil.getSingleSparkContext()
.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
  classOf[ImmutableBytesWritable], classOf[Result]).map {
case (_: ImmutableBytesWritable, result: Result) = {
  result
}
  }
}
return generateRdd
  }
   
   
  --  
  qiaou
  已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
   
   
  在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:
   
   Could you provide the code of hbaseQuery? It maybe doesn't support to 
   execute in parallel.

   Best Regards,
   Shixiong Zhu




   2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com 
   (mailto:qiaou8...@gmail.com):
Hi:  
I got a problem with using the union method of RDD
things like this
I get a function like
def hbaseQuery(area:string):RDD[Result]= ???
when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it 
returns 0
however when use like this  
sc.parallize(hbaseQuery('aa’).collect.toList ::: 
hbaseQuery(’bb’).collect.toList).count() it return the right value  
obviously i have got an action after my transformation action ,but why 
it did not work
fyi
 
--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
 

   
  



Re: ISpark class not found

2014-11-11 Thread MEETHU MATHEW
Hi,
I was also trying Ispark..But I couldnt even start the notebook..I am getting 
the following error.
ERROR:tornado.access:500 POST /api/sessions (127.0.0.1) 10.15ms 
referer=http://localhost:/notebooks/Scala/Untitled0.ipynb
How did you start the notebook?
 Thanks  Regards,
Meethu M 

 On Wednesday, 12 November 2014 6:50 AM, Laird, Benjamin 
benjamin.la...@capitalone.com wrote:
   

 I've been experimenting with the ISpark extension to IScala 
(https://github.com/tribbloid/ISpark)
Objects created in the REPL are not being loaded correctly on worker nodes, 
leading to a ClassNotFound exception. This does work correctly in spark-shell.
I was curious if anyone has used ISpark and has encountered this issue. Thanks!

Simple example:
In [1]: case class Circle(rad:Float)
In [2]: val rdd = sc.parallelize(1 to 
1).map(i=Circle(i.toFloat)).take(10)14/11/11 13:03:35 ERROR 
TaskResultGetter: Exception while getting task 
resultcom.esotericsoftware.kryo.KryoException: Unable to find class: 
[L$line5.$read$$iwC$$iwC$Circle;

Full trace in my gist: 
https://gist.github.com/benjaminlaird/3e543a9a89fb499a3a14

 The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.

   

About Join operator in PySpark

2014-11-11 Thread 夏俊鸾
Hi all

I have noticed that “Join” operator has been transferred to union and
groupByKey operator instead of cogroup operator in PySpark, this change
will probably generate more shuffle stage, for example

rdd1 = sc.makeRDD(...).partitionBy(2)
rdd2 = sc.makeRDD(...).partitionBy(2)
rdd3 = rdd1.join().collect()

Above code implemented with scala will generate 2 shuffle, but will
generate 3 shuffle with PySpark. what is initial design motivation of join
operator in PySpark? Any idea to improve join performance in PySpark?

Andrew


Re: Spark and Play

2014-11-11 Thread John Meehan
You can also build a Play 2.2.x + Spark 1.1.0 fat jar with sbt-assembly
for, e.g. yarn-client support or using with spark-shell for debugging:

play.Project.playScalaSettings

libraryDependencies ~= { _ map {
  case m if m.organization == com.typesafe.play =
m.exclude(commons-logging, commons-logging)
  case m = m
}}

assemblySettings

test in assembly := {}

mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =
  {
case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard
case m if m.startsWith(META-INF) = MergeStrategy.discard
case PathList(javax, servlet, xs @ _*) = MergeStrategy.first
case PathList(org, apache, xs @ _*) = MergeStrategy.first
case PathList(org, jboss, xs @ _*) = MergeStrategy.first
case PathList(org, slf4j, xs @ _*) = MergeStrategy.discard
case about.html  = MergeStrategy.rename
case reference.conf = MergeStrategy.concat
case _ = MergeStrategy.first
  }
}

On Tue, Nov 11, 2014 at 3:04 PM, Mohammed Guller moham...@glassbeam.com
wrote:

 Actually, it is possible to integrate Spark 1.1.0 with Play 2.2.x

 Here is a sample build.sbt file:

 name := xyz

 version := 0.1 

 scalaVersion := 2.10.4

 libraryDependencies ++= Seq(
   jdbc,
   anorm,
   cache,
   org.apache.spark %% spark-core % 1.1.0,
   com.typesafe.akka %% akka-actor % 2.2.3,
   com.typesafe.akka %% akka-slf4j % 2.2.3,
   org.apache.spark %% spark-sql % 1.1.0
 )

 play.Project.playScalaSettings


 Mohammed

 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]
 Sent: Tuesday, November 11, 2014 2:06 PM
 To: Akshat Aranya
 Cc: user@spark.apache.org
 Subject: Re: Spark and Play

 Hi There,

 Because Akka versions are not binary compatible with one another, it might
 not be possible to integrate Play with Spark 1.1.0.

 - Patrick

 On Tue, Nov 11, 2014 at 8:21 AM, Akshat Aranya aara...@gmail.com wrote:
  Hi,
 
  Sorry if this has been asked before; I didn't find a satisfactory
  answer when searching.  How can I integrate a Play application with
  Spark?  I'm getting into issues of akka-actor versions.  Play 2.2.x
  uses akka-actor 2.0, whereas Play 2.3.x uses akka-actor 2.3.4, neither
  of which work fine with Spark 1.1.0.  Is there something I should do
  with libraryDependencies in my build.sbt to make it work?
 
  Thanks,
  Akshat

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How did the RDD.union work

2014-11-11 Thread Shixiong Zhu
The `conf` object will be sent to other nodes via Broadcast.

Here is the scaladoc of Broadcast:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast

In addition, the object v should not be modified after it is broadcast in
order to ensure that all nodes get the same value of the broadcast variable
(e.g. if the variable is shipped to a new node later).

Best Regards,
Shixiong Zhu

2014-11-12 15:20 GMT+08:00 qiaou qiaou8...@gmail.com:

  this work!
 but can you explain why should use like this?

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig

 在 2014年11月12日 星期三,下午3:18,Shixiong Zhu 写道:

 You need to create a new configuration for each RDD. Therefore, val
 hbaseConf = HBaseConfigUtil.getHBaseConfiguration should be changed to val
 hbaseConf = new Configuration(HBaseConfigUtil.getHBaseConfiguration)

 Best Regards,
 Shixiong Zhu

 2014-11-12 14:53 GMT+08:00 qiaou qiaou8...@gmail.com:

  ok here is the code

 def hbaseQuery:(String)=RDD[Result] = {
   val generateRdd = (area:String)={
 val startRowKey = s$area${RowKeyUtils.convertToHex(startId,
 10)}
 val stopRowKey = s$area${RowKeyUtils.convertToHex(endId,
 10)}
 println(sstartRowKey:${startRowKey})
 println(sstopRowKey :${stopRowKey})

 val scan = new Scan()
 scan.setStartRow(Bytes.toBytes(startRowKey))
 scan.setStopRow(Bytes.toBytes(stopRowKey))
 val filterList: FilterList = new FilterList()
 if (appKey != null  !appKey.equals(_)) {
   val appKeyFilter: SingleColumnValueFilter =
 new SingleColumnValueFilter(Bytes.toBytes(clientInfo),
 Bytes.toBytes(optKey), CompareOp.EQUAL, Bytes.toBytes(appKey))
   filterList.addFilter(appKeyFilter)
 }
 if (imei != null  !imei.equals(_)) {
   val imeiFilter: SingleColumnValueFilter =
 new SingleColumnValueFilter(Bytes.toBytes(clientInfo),
 Bytes.toBytes(optImei), CompareOp.EQUAL, Bytes.toBytes(imei))
   filterList.addFilter(imeiFilter)
 }
 if (filterList.getFilters != null  filterList.getFilters.size()
  0) {
   scan.setFilter(filterList)
 }
 scan.setCaching(1)

 val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
 hbaseConf.set(TableInputFormat.INPUT_TABLE, asrLogFeedBack)
 hbaseConf.set(TableInputFormat.SCAN,
 Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

 SparkUtil.getSingleSparkContext()
   .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
 classOf[ImmutableBytesWritable], classOf[Result]).map {
   case (_: ImmutableBytesWritable, result: Result) = {
 result
   }
 }
   }
   return generateRdd
 }

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig

 在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:

 Could you provide the code of hbaseQuery? It maybe doesn't support to
 execute in parallel.

 Best Regards,
 Shixiong Zhu

 2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com:

  Hi:
 I got a problem with using the union method of RDD
 things like this
 I get a function like
 def hbaseQuery(area:string):RDD[Result]= ???
 when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
 returns 0
 however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList
 ::: hbaseQuery(’bb’).collect.toList).count() it return the right value
 obviously i have got an action after my transformation action ,but why it
 did not work
 fyi

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig








Re: Read a HDFS file from Spark source code

2014-11-11 Thread rapelly kartheek
Hi Sean,
I was following this link;

http://mund-consulting.com/Blog/Posts/file-operations-in-HDFS-using-java.aspx

But, I was facing FileSystem ambiguity error. I really don't have any idea
as to how to go about doing this.
Can you please help me how to start off with this?


On Wed, Nov 12, 2014 at 11:26 AM, Samarth Mailinglist 
mailinglistsama...@gmail.com wrote:

 Instead of a file path, use a HDFS URI.
 For example: (In Python)



 data = sc.textFile(hdfs://localhost/user/someuser/data)

 ​

 On Wed, Nov 12, 2014 at 10:12 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi

 I am trying to access a file in HDFS from spark source code. Basically,
 I am tweaking the spark source code. I need to access a file in HDFS from
 the source code of the spark. I am really not understanding how to go about
 doing this.

 Can someone please help me out in this regard.
 Thank you!!
 Karthik





<    1   2