Re: Dynamic resource allocation to Spark on Mesos

2017-02-08 Thread Sun Rui
Michael,
No. We directly launch the external shuffle service by specifying a larger heap 
size than default on each worker node. It is observed that the processes are 
quite stable.
> On Feb 9, 2017, at 05:21, Michael Gummelt <mgumm...@mesosphere.io> wrote:
> 
> Sun, are you using marathon to run the shuffle service?
> 
> On Tue, Feb 7, 2017 at 7:36 PM, Sun Rui <sunrise_...@163.com 
> <mailto:sunrise_...@163.com>> wrote:
> Yi Jan,
> 
> We have been using Spark on Mesos with dynamic allocation enabled, which 
> works and improves the overall cluster utilization.
> 
> In terms of job, do you mean jobs inside a Spark application or jobs among 
> different applications? Maybe you can read 
> http://spark.apache.org/docs/latest/job-scheduling.html 
> <http://spark.apache.org/docs/latest/job-scheduling.html> for help.
> 
>> On Jan 31, 2017, at 03:34, Michael Gummelt <mgumm...@mesosphere.io 
>> <mailto:mgumm...@mesosphere.io>> wrote:
>> 
>> 
>> 
>> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan <ji...@drive.ai 
>> <mailto:ji...@drive.ai>> wrote:
>> Tasks begin scheduling as soon as the first executor comes up
>> 
>> Thanks all for the clarification. Is this the default behavior of Spark on 
>> Mesos today? I think this is what we are looking for because sometimes a job 
>> can take up lots of resources and later jobs could not get all the resources 
>> that it asks for. If a Spark job starts with only a subset of resources that 
>> it asks for, does it know to expand its resources later when more resources 
>> become available?
>> 
>> Yes.
>>  
>> 
>> Launch each executor with at least 1GB RAM, but if mesos offers 2GB at some 
>> moment, then launch an executor with 2GB RAM
>> 
>> This is less useful in our use case. But I am also quite interested in cases 
>> in which this could be helpful. I think this will also help with overall 
>> resource utilization on the cluster if when another job starts up that has a 
>> hard requirement on resources, the extra resources to the first job can be 
>> flexibly re-allocated to the second job. 
>> 
>> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt <mgumm...@mesosphere.io 
>> <mailto:mgumm...@mesosphere.io>> wrote:
>> We've talked about that, but it hasn't become a priority because we haven't 
>> had a driving use case.  If anyone has a good argument for "variable" 
>> resource allocation like this, please let me know.
>> 
>> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin <linshuai2...@gmail.com 
>> <mailto:linshuai2...@gmail.com>> wrote:
>> An alternative behavior is to launch the job with the best resource offer 
>> Mesos is able to give
>> 
>> Michael has just made an excellent explanation about dynamic allocation 
>> support in mesos. But IIUC, what you want to achieve is something like 
>> (using RAM as an example) : "Launch each executor with at least 1GB RAM, but 
>> if mesos offers 2GB at some moment, then launch an executor with 2GB RAM".
>> 
>> I wonder what's benefit of that? To reduce the "resource fragmentation"?
>> 
>> Anyway, that is not supported at this moment. In all the supported cluster 
>> managers of spark (mesos, yarn, standalone, and the up-to-coming spark on 
>> kubernetes), you have to specify the cores and memory of each executor.
>> 
>> It may not be supported in the future, because only mesos has the concepts 
>> of offers because of its two-level scheduling model.
>> 
>> 
>> On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan <ji...@drive.ai 
>> <mailto:ji...@drive.ai>> wrote:
>> Dear Spark Users,
>> 
>> Currently is there a way to dynamically allocate resources to Spark on 
>> Mesos? Within Spark we can specify the CPU cores, memory before running job. 
>> The way I understand is that the Spark job will not run if the CPU/Mem 
>> requirement is not met. This may lead to decrease in overall utilization of 
>> the cluster. An alternative behavior is to launch the job with the best 
>> resource offer Mesos is able to give. Is this possible with the current 
>> implementation?
>> 
>> Thanks
>> Ji
>> 
>> The information in this email is confidential and may be legally privileged. 
>> It is intended solely for the addressee. Access to this email by anyone else 
>> is unauthorized. If you are not the intended recipient, any disclosure, 
>> copying, distribution or any action taken or omitted to be taken in reliance 
>> on it, is prohibited and may be unlawful.
>> 
>> 
>> 
>> 
>> 
>> -- 
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
>> 
>> 
>> The information in this email is confidential and may be legally privileged. 
>> It is intended solely for the addressee. Access to this email by anyone else 
>> is unauthorized. If you are not the intended recipient, any disclosure, 
>> copying, distribution or any action taken or omitted to be taken in reliance 
>> on it, is prohibited and may be unlawful.
>> 
>> 
>> 
>> 
>> -- 
>> Michael Gummelt
>> Software Engineer
>> Mesosphere
> 
> 
> 
> 
> -- 
> Michael Gummelt
> Software Engineer
> Mesosphere



Re: Dynamic resource allocation to Spark on Mesos

2017-02-07 Thread Sun Rui
Yi Jan,

We have been using Spark on Mesos with dynamic allocation enabled, which works 
and improves the overall cluster utilization.

In terms of job, do you mean jobs inside a Spark application or jobs among 
different applications? Maybe you can read 
http://spark.apache.org/docs/latest/job-scheduling.html 
 for help.

> On Jan 31, 2017, at 03:34, Michael Gummelt  wrote:
> 
> 
> 
> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  > wrote:
> Tasks begin scheduling as soon as the first executor comes up
> 
> Thanks all for the clarification. Is this the default behavior of Spark on 
> Mesos today? I think this is what we are looking for because sometimes a job 
> can take up lots of resources and later jobs could not get all the resources 
> that it asks for. If a Spark job starts with only a subset of resources that 
> it asks for, does it know to expand its resources later when more resources 
> become available?
> 
> Yes.
>  
> 
> Launch each executor with at least 1GB RAM, but if mesos offers 2GB at some 
> moment, then launch an executor with 2GB RAM
> 
> This is less useful in our use case. But I am also quite interested in cases 
> in which this could be helpful. I think this will also help with overall 
> resource utilization on the cluster if when another job starts up that has a 
> hard requirement on resources, the extra resources to the first job can be 
> flexibly re-allocated to the second job. 
> 
> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt  > wrote:
> We've talked about that, but it hasn't become a priority because we haven't 
> had a driving use case.  If anyone has a good argument for "variable" 
> resource allocation like this, please let me know.
> 
> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin  > wrote:
> An alternative behavior is to launch the job with the best resource offer 
> Mesos is able to give
> 
> Michael has just made an excellent explanation about dynamic allocation 
> support in mesos. But IIUC, what you want to achieve is something like (using 
> RAM as an example) : "Launch each executor with at least 1GB RAM, but if 
> mesos offers 2GB at some moment, then launch an executor with 2GB RAM".
> 
> I wonder what's benefit of that? To reduce the "resource fragmentation"?
> 
> Anyway, that is not supported at this moment. In all the supported cluster 
> managers of spark (mesos, yarn, standalone, and the up-to-coming spark on 
> kubernetes), you have to specify the cores and memory of each executor.
> 
> It may not be supported in the future, because only mesos has the concepts of 
> offers because of its two-level scheduling model.
> 
> 
> On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan  > wrote:
> Dear Spark Users,
> 
> Currently is there a way to dynamically allocate resources to Spark on Mesos? 
> Within Spark we can specify the CPU cores, memory before running job. The way 
> I understand is that the Spark job will not run if the CPU/Mem requirement is 
> not met. This may lead to decrease in overall utilization of the cluster. An 
> alternative behavior is to launch the job with the best resource offer Mesos 
> is able to give. Is this possible with the current implementation?
> 
> Thanks
> Ji
> 
> The information in this email is confidential and may be legally privileged. 
> It is intended solely for the addressee. Access to this email by anyone else 
> is unauthorized. If you are not the intended recipient, any disclosure, 
> copying, distribution or any action taken or omitted to be taken in reliance 
> on it, is prohibited and may be unlawful.
> 
> 
> 
> 
> 
> -- 
> Michael Gummelt
> Software Engineer
> Mesosphere
> 
> 
> The information in this email is confidential and may be legally privileged. 
> It is intended solely for the addressee. Access to this email by anyone else 
> is unauthorized. If you are not the intended recipient, any disclosure, 
> copying, distribution or any action taken or omitted to be taken in reliance 
> on it, is prohibited and may be unlawful.
> 
> 
> 
> 
> -- 
> Michael Gummelt
> Software Engineer
> Mesosphere



Re: RDD Location

2016-12-30 Thread Sun Rui
You can’t call runJob inside getPreferredLocations().
You can take a look at the source  code of HadoopRDD to help you implement 
getPreferredLocations() appropriately.
> On Dec 31, 2016, at 09:48, Fei Hu <hufe...@gmail.com> wrote:
> 
> That is a good idea.
> 
> I tried add the following code to get getPreferredLocations() function:
> 
> val results: Array[Array[DataChunkPartition]] = context.runJob(
>   partitionsRDD, (context: TaskContext, partIter: 
> Iterator[DataChunkPartition]) => partIter.toArray, dd, allowLocal = true)
> 
> But it seems to be suspended when executing this function. But if I move the 
> code to other places, like the main() function, it runs well.
> 
> What is the reason for it?
> 
> Thanks,
> Fei
> 
> On Fri, Dec 30, 2016 at 2:38 AM, Sun Rui <sunrise_...@163.com 
> <mailto:sunrise_...@163.com>> wrote:
> Maybe you can create your own subclass of RDD and override the 
> getPreferredLocations() to implement the logic of dynamic changing of the 
> locations.
> > On Dec 30, 2016, at 12:06, Fei Hu <hufe...@gmail.com 
> > <mailto:hufe...@gmail.com>> wrote:
> >
> > Dear all,
> >
> > Is there any way to change the host location for a certain partition of RDD?
> >
> > "protected def getPreferredLocations(split: Partition)" can be used to 
> > initialize the location, but how to change it after the initialization?
> >
> >
> > Thanks,
> > Fei
> >
> >
> 
> 
> 



Re: RDD Location

2016-12-29 Thread Sun Rui
Maybe you can create your own subclass of RDD and override the 
getPreferredLocations() to implement the logic of dynamic changing of the 
locations.
> On Dec 30, 2016, at 12:06, Fei Hu  wrote:
> 
> Dear all,
> 
> Is there any way to change the host location for a certain partition of RDD?
> 
> "protected def getPreferredLocations(split: Partition)" can be used to 
> initialize the location, but how to change it after the initialization?
> 
> 
> Thanks,
> Fei
> 
> 



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark 2.0.2 HDFS]: no data locality

2016-12-27 Thread Sun Rui
Although the Spark task scheduler is aware of rack-level data locality, it 
seems that only YARN implements the support for it. However, node-level 
locality can still work for Standalone.

It is not necessary to copy the hadoop config files into the Spark CONF 
directory. Set HADOOP_CONF_DIR to point to the conf directory of your Hadoop.

Data Locality involves in both task data locality and executor data locality. 
Executor data locality is only supported on YARN with executor dynamic 
allocation enabled. For standalone, by default, a Spark application will 
acquire all available cores in the cluster, generally meaning there is at least 
one executor on each node, in which case task data locality can work because a 
task can be dispatched to an executor on any of the preferred nodes of the task 
for execution.

for your case, have you set spark.cores.max to limit the cores to acquire, 
which means executors are available on a subset of the cluster nodes?

> On Dec 27, 2016, at 01:39, Karamba  wrote:
> 
> Hi,
> 
> I am running a couple of docker hosts, each with an HDFS and a spark
> worker in a spark standalone cluster.
> In order to get data locality awareness, I would like to configure Racks
> for each host, so that a spark worker container knows from which hdfs
> node container it should load its data. Does this make sense?
> 
> I configured HDFS container nodes via the core-site.xml in
> $HADOOP_HOME/etc and this works. hdfs dfsadmin -printTopology shows my
> setup.
> 
> I configured SPARK the same way. I placed core-site.xml and
> hdfs-site.xml in the SPARK_CONF_DIR ... BUT this has no effect.
> 
> Submitting a spark job via spark-submit to the spark-master that loads
> from HDFS just has Data locality ANY.
> 
> It would be great if anybody would help me getting the right configuration!
> 
> Thanks and best regards,
> on
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Sun Rui
Before 2.0, Spark has built-in support for caching RDD data on 
Tachyon(Alluxio), but that support is removed since 2.0. In either case, Spark 
does not support writing shuffle data to Tachyon.

Since Alluxio has experimental support for FUSE 
(http://www.alluxio.org/docs/master/en/Mounting-Alluxio-FS-with-FUSE.html 
<http://www.alluxio.org/docs/master/en/Mounting-Alluxio-FS-with-FUSE.html>), 
you can try it and set spark.local.dir to point to the directory of Alluxio 
FUSE.

There is also on-going effort trying to take advantage of SSD to improve 
shuffle performance, see https://issues.apache.org/jira/browse/SPARK-12196 
<https://issues.apache.org/jira/browse/SPARK-12196>. The PR is ready, but not 
get merged. You may give it a try by yourself.

> On Aug 24, 2016, at 22:30, tony@tendcloud.com wrote:
> 
> Hi, Saisai and Rui,
> Thanks a lot for your answer.  Alluxio tried to work as the middle layer 
> between storage and Spark, so is it possible to use Alluxio to resolve the 
> issue? We want to have 1 SSD for every datanode and use Alluxio to manage 
> mem,ssd and hdd. 
> 
> Thanks and Regards,
> Tony
> 
> tony@tendcloud.com <mailto:tony@tendcloud.com>
>  
> From: Sun Rui <mailto:sunrise_...@163.com>
> Date: 2016-08-24 22:17
> To: Saisai Shao <mailto:sai.sai.s...@gmail.com>
> CC: tony@tendcloud.com <mailto:tony@tendcloud.com>; user 
> <mailto:user@spark.apache.org>
> Subject: Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?
> Yes, I also tried FUSE before, it is not stable and I don’t recommend it
>> On Aug 24, 2016, at 22:15, Saisai Shao <sai.sai.s...@gmail.com 
>> <mailto:sai.sai.s...@gmail.com>> wrote:
>> 
>> Also fuse is another candidate (https://wiki.apache.org/hadoop/MountableHDFS 
>> <https://wiki.apache.org/hadoop/MountableHDFS>), but not so stable as I 
>> tried before.
>> 
>> On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui <sunrise_...@163.com 
>> <mailto:sunrise_...@163.com>> wrote:
>> For HDFS, maybe you can try mount HDFS as NFS. But not sure about the 
>> stability, and also there is additional overhead of network I/O and replica 
>> of HDFS files.
>> 
>>> On Aug 24, 2016, at 21:02, Saisai Shao <sai.sai.s...@gmail.com 
>>> <mailto:sai.sai.s...@gmail.com>> wrote:
>>> 
>>> Spark Shuffle uses Java File related API to create local dirs and R/W data, 
>>> so it can only be worked with OS supported FS. It doesn't leverage Hadoop 
>>> FileSystem API, so writing to Hadoop compatible FS is not worked.
>>> 
>>> Also it is not suitable to write temporary shuffle data into distributed 
>>> FS, this will bring unnecessary overhead. In you case if you have large 
>>> memory on each node, you could use ramfs instead to store shuffle data.
>>> 
>>> Thanks
>>> Saisai
>>> 
>>> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com 
>>> <mailto:tony@tendcloud.com> <tony@tendcloud.com 
>>> <mailto:tony@tendcloud.com>> wrote:
>>> Hi, All,
>>> When we run Spark on very large data, spark will do shuffle and the shuffle 
>>> data will write to local disk. Because we have limited capacity at local 
>>> disk, the shuffled data will occupied all of the local disk and then will 
>>> be failed.  So is there a way we can write the shuffle spill data to HDFS? 
>>> Or if we introduce alluxio in our system, can the shuffled data write to 
>>> alluxio?
>>> 
>>> Thanks and Regards,
>>> 
>>> 阎志涛(Tony)
>>> 
>>> 北京腾云天下科技有限公司
>>> 
>>> 邮箱:tony@tendcloud.com <mailto:tony@tendcloud.com>
>>> 电话:13911815695
>>> 微信: zhitao_yan
>>> QQ : 4707059
>>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>>> 邮编:100027
>>> 
>>> TalkingData.com <http://talkingdata.com/> - 让数据说话



Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Sun Rui
Yes, I also tried FUSE before, it is not stable and I don’t recommend it
> On Aug 24, 2016, at 22:15, Saisai Shao <sai.sai.s...@gmail.com> wrote:
> 
> Also fuse is another candidate (https://wiki.apache.org/hadoop/MountableHDFS 
> <https://wiki.apache.org/hadoop/MountableHDFS>), but not so stable as I tried 
> before.
> 
> On Wed, Aug 24, 2016 at 10:09 PM, Sun Rui <sunrise_...@163.com 
> <mailto:sunrise_...@163.com>> wrote:
> For HDFS, maybe you can try mount HDFS as NFS. But not sure about the 
> stability, and also there is additional overhead of network I/O and replica 
> of HDFS files.
> 
>> On Aug 24, 2016, at 21:02, Saisai Shao <sai.sai.s...@gmail.com 
>> <mailto:sai.sai.s...@gmail.com>> wrote:
>> 
>> Spark Shuffle uses Java File related API to create local dirs and R/W data, 
>> so it can only be worked with OS supported FS. It doesn't leverage Hadoop 
>> FileSystem API, so writing to Hadoop compatible FS is not worked.
>> 
>> Also it is not suitable to write temporary shuffle data into distributed FS, 
>> this will bring unnecessary overhead. In you case if you have large memory 
>> on each node, you could use ramfs instead to store shuffle data.
>> 
>> Thanks
>> Saisai
>> 
>> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com 
>> <mailto:tony@tendcloud.com> <tony@tendcloud.com 
>> <mailto:tony@tendcloud.com>> wrote:
>> Hi, All,
>> When we run Spark on very large data, spark will do shuffle and the shuffle 
>> data will write to local disk. Because we have limited capacity at local 
>> disk, the shuffled data will occupied all of the local disk and then will be 
>> failed.  So is there a way we can write the shuffle spill data to HDFS? Or 
>> if we introduce alluxio in our system, can the shuffled data write to 
>> alluxio?
>> 
>> Thanks and Regards,
>> 
>> 阎志涛(Tony)
>> 
>> 北京腾云天下科技有限公司
>> 
>> 邮箱:tony@tendcloud.com <mailto:tony@tendcloud.com>
>> 电话:13911815695
>> 微信: zhitao_yan
>> QQ : 4707059
>> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
>> 邮编:100027
>> 
>> TalkingData.com <http://talkingdata.com/> - 让数据说话
>> 
> 
> 



Re: Can we redirect Spark shuffle spill data to HDFS or Alluxio?

2016-08-24 Thread Sun Rui
For HDFS, maybe you can try mount HDFS as NFS. But not sure about the 
stability, and also there is additional overhead of network I/O and replica of 
HDFS files.
> On Aug 24, 2016, at 21:02, Saisai Shao  wrote:
> 
> Spark Shuffle uses Java File related API to create local dirs and R/W data, 
> so it can only be worked with OS supported FS. It doesn't leverage Hadoop 
> FileSystem API, so writing to Hadoop compatible FS is not worked.
> 
> Also it is not suitable to write temporary shuffle data into distributed FS, 
> this will bring unnecessary overhead. In you case if you have large memory on 
> each node, you could use ramfs instead to store shuffle data.
> 
> Thanks
> Saisai
> 
> On Wed, Aug 24, 2016 at 8:11 PM, tony@tendcloud.com 
>   > wrote:
> Hi, All,
> When we run Spark on very large data, spark will do shuffle and the shuffle 
> data will write to local disk. Because we have limited capacity at local 
> disk, the shuffled data will occupied all of the local disk and then will be 
> failed.  So is there a way we can write the shuffle spill data to HDFS? Or if 
> we introduce alluxio in our system, can the shuffled data write to alluxio?
> 
> Thanks and Regards,
> 
> 阎志涛(Tony)
> 
> 北京腾云天下科技有限公司
> 
> 邮箱:tony@tendcloud.com 
> 电话:13911815695
> 微信: zhitao_yan
> QQ : 4707059
> 地址:北京市东城区东直门外大街39号院2号楼航空服务大厦602室
> 邮编:100027
> 
> TalkingData.com  - 让数据说话
> 



Re: SparkR error when repartition is called

2016-08-09 Thread Sun Rui
I can’t reproduce your issue with len=1 in local mode.
Could you give more environment information?
> On Aug 9, 2016, at 11:35, Shane Lee  wrote:
> 
> Hi All,
> 
> I am trying out SparkR 2.0 and have run into an issue with repartition. 
> 
> Here is the R code (essentially a port of the pi-calculating scala example in 
> the spark package) that can reproduce the behavior:
> 
> schema <- structType(structField("input", "integer"), 
> structField("output", "integer"))
> 
> library(magrittr)
> 
> len = 3000
> data.frame(n = 1:len) %>%
> as.DataFrame %>%
> SparkR:::repartition(10L) %>%
>   dapply(., function (df)
>   {
>   library(plyr)
>   ddply(df, .(n), function (y)
>   {
>   data.frame(z = 
>   {
>   x1 = runif(1) * 2 - 1
>   y1 = runif(1) * 2 - 1
>   z = x1 * x1 + y1 * y1
>   if (z < 1)
>   {
>   1L
>   }
>   else
>   {
>   0L
>   }
>   })
>   })
>   }
>   , schema
>   ) %>% 
>   SparkR:::summarize(total = sum(.$output)) %>% collect * 4 / len
> 
> For me it runs fine as long as len is less than 5000, otherwise it errors out 
> with the following message:
> 
> Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
>   org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 
> in stage 56.0 failed 4 times, most recent failure: Lost task 6.3 in stage 
> 56.0 (TID 899, LARBGDV-VM02): org.apache.spark.SparkException: R computation 
> failed with
>  Error in readBin(con, raw(), stringLen, endian = "big") : 
>   invalid 'n' argument
> Calls:  -> readBin
> Execution halted
>   at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
>   at 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:59)
>   at 
> org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29)
>   at 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178)
>   at 
> org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
>   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$
> 
> If the repartition call is removed, it runs fine again, even with very large 
> len.
> 
> After looking through the documentations and searching the web, I can't seem 
> to find any clues how to fix this. Anybody has seen similary problem?
> 
> Thanks in advance for your help.
> 
> Shane
> 



Re: how to run local[k] threads on a single core

2016-08-04 Thread Sun Rui
I don’t think it possible as Spark does not support thread to CPU affinity.
> On Aug 4, 2016, at 14:27, sujeet jog  wrote:
> 
> Is there a way we can run multiple tasks concurrently on a single core in 
> local mode.
> 
> for ex :- i have 5 partition ~ 5 tasks, and only a single core , i want these 
> tasks to run concurrently, and specifiy them to use /run on a single core. 
> 
> The machine itself is say 4 core, but i want to utilize only 1 core out of 
> it,. 
> 
> Is it possible ?
> 
> Thanks, 
> Sujeet
> 



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-08-03 Thread Sun Rui
--num-executors does not work for Standalone mode. Try --total-executor-cores
> On Jul 26, 2016, at 00:17, Mich Talebzadeh  wrote:
> 
> Hi,
> 
> 
> I am doing some tests
> 
> I have started Spark in Standalone mode.
> 
> For simplicity I am using one node only with 8 works and I have 12 cores
> 
> In spark-env.sh I set this
> 
> # Options for the daemons used in the standalone deploy mode
> export SPARK_WORKER_CORES=1 ##, total number of cores to be used by executors 
> by each worker
> export SPARK_WORKER_MEMORY=1g ##, to set how much total memory workers have 
> to give executors (e.g. 1000m, 2g)
> the worker
> export SPARK_WORKER_INSTANCES=8 ##, to set the number of worker processes per 
> node
> 
> So it is pretty straight forward with 8 works and each worker assigned one 
> core
> 
> jps|grep Worker
> 15297 Worker
> 14794 Worker
> 15374 Worker
> 14998 Worker
> 15198 Worker
> 15465 Worker
> 14897 Worker
> 15099 Worker
> 
> I start Spark Thrift Server with the following parameters (using standalone 
> mode)
> 
> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> --master spark://50.140.197.217:7077 
>  \
> --hiveconf hive.server2.thrift.port=10055 \
> --driver-memory 1G \
> --num-executors 1 \
> --executor-cores 1 \
> --executor-memory 1G \
> --conf "spark.scheduler.mode=FIFO" \
> 
> With one executor allocated 1 core
> 
> However, I can see both in the OS and UI that it starts with 8 executors, the 
> same number of workers on this node!
> 
> jps|egrep 'SparkSubmit|CoarseGrainedExecutorBackend'|sort
> 32711 SparkSubmit
> 369 CoarseGrainedExecutorBackend
> 370 CoarseGrainedExecutorBackend
> 371 CoarseGrainedExecutorBackend
> 376 CoarseGrainedExecutorBackend
> 387 CoarseGrainedExecutorBackend
> 395 CoarseGrainedExecutorBackend
> 419 CoarseGrainedExecutorBackend
> 420 CoarseGrainedExecutorBackend
> 
> 
> I fail to see why this is happening. Nothing else is running Spark wise. The 
> cause?
> 
>  How can I stop STS going and using all available workers?
> 
> Thanks
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  



Re: How to partition a SparkDataFrame using all distinct column values in sparkR

2016-08-03 Thread Sun Rui
SparkDataFrame.repartition() uses hash partitioning, it can guarantee that all 
rows of the same column value go to the same partition, but it does not 
guarantee that each partition contain only single column value.

Fortunately, Spark 2.0 comes with gapply() in SparkR. You can apply an R 
function to all groups grouped by the column.

> On Jul 26, 2016, at 06:46, Neil Chang  wrote:
> 
> Hi,
>   This is a question regarding SparkR in spark 2.0.
> 
> Given that I have a SparkDataFrame and I want to partition it using one 
> column's values. Each value corresponds to a partition, all rows that having 
> the same column value shall go to the same partition, no more no less. 
> 
>Seems the function repartition() doesn't do this, I have 394 unique 
> values, it just partitions my DataFrame into 200. If I specify the 
> numPartitions to 394, some mismatch happens.
> 
> Is it possible to do what I described in sparkR?
> GroupBy doesn't work with udf at all.
> 
> Or can we split the DataFrame into list of small ones first, if so, what can 
> I use?
> 
> Thanks,
> Neil



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [2.0.0] mapPartitions on DataFrame unable to find encoder

2016-08-02 Thread Sun Rui
import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(df.schema)
df.mapPartitions(_.take(1))

> On Aug 3, 2016, at 04:55, Dragisa Krsmanovic  wrote:
> 
> I am trying to use mapPartitions on DataFrame.
> 
> Example:
> 
> import spark.implicits._
> val df: DataFrame = Seq((1,"one"), (2, "two")).toDF("id", "name")
> df.mapPartitions(_.take(1))
> 
> I am getting:
> 
> Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
> String, etc) and Product types (case classes) are supported by importing 
> spark.implicits._  Support for serializing other types will be added in 
> future releases.
> 
> Since DataFrame is Dataset[Row], I was expecting encoder for Row to be there.
> 
> What's wrong with my code ?
>  
> 
> -- 
> Dragiša Krsmanović | Platform Engineer | Ticketfly
> 
> dragi...@ticketfly.com 
> @ticketfly  | ticketfly.com/blog 
>  | facebook.com/ticketfly 
> 


Re: Application not showing in Spark History

2016-08-02 Thread Sun Rui
bin/spark-submit will set some env variable, like SPARK_HOME, that Spark later 
will use to locate the spark-defaults.conf from which default settings for 
Spark will be loaded.

I would guess that some configuration option like spark.eventLog.enabled in the 
spark-defaults.conf is skipped by directly using the SparkSubmit class instead 
of “bin/spark-submit”.

The formal way to launch a Spark application within Java is to use 
SparkLauncher. Remember to call SparkLaunch.setSparkHome() to set the Spark 
Home directory.

> On Aug 2, 2016, at 16:53, Rychnovsky, Dusan 
>  wrote:
> 
> Hi,
> 
> I am trying to launch my Spark application from within my Java application 
> via the SparkSubmit class, like this:
> 
> 
> List args = new ArrayList<>();
> 
> args.add("--verbose");
> args.add("--deploy-mode=cluster");
> args.add("--master=yarn");
> ...
> 
> SparkSubmit.main(args.toArray(new String[args.size()]));
> 
> 
> This works fine, with one catch - the application does not appear in Spark 
> History after it's finished.
> 
> If, however, I run the application using `spark-submit.sh`, like this:
> 
> 
> spark-submit \
>   --verbose \
>   --deploy-mode=cluster \
>   --master=yarn \
>   ...
> 
> 
> the application appears in Spark History correctly.
> 
> What am I missing?
> 
> Also, is this a good way to launch a Spark application from within a Java 
> application or is there a better way?
> 
> Thanks,
> Dusan



Re: SPARK Exception thrown in awaitResult

2016-07-28 Thread Sun Rui
Are you using Mesos? if not , https://issues.apache.org/jira/browse/SPARK-16522 
  is not relevant

You may describe more information about your Spark environment, and the full 
stack trace.
> On Jul 28, 2016, at 17:44, Carlo.Allocca  wrote:
> 
> Hi All, 
> 
> I am running SPARK locally, and when running d3=join(d1,d2) and d5=(d3, d4) 
> am getting  the following exception "org.apache.spark.SparkException: 
> Exception thrown in awaitResult”. 
> Googling for it, I found that the closed is the answer reported 
> https://issues.apache.org/jira/browse/SPARK-16522 
>  which mention that it is 
> bug of the SPARK 2.0.0. 
> 
> Is it correct or am I missing anything? 
> 
> Many Thanks for your answer and help. 
> 
> Best Regards,
> Carlo
> 
> -- The Open University is incorporated by Royal Charter (RC 000391), an 
> exempt charity in England & Wales and a charity registered in Scotland (SC 
> 038302). The Open University is authorised and regulated by the Financial 
> Conduct Authority.



Re: Spark 2.0 on YARN - Dynamic Resource Allocation Behavior change?

2016-07-28 Thread Sun Rui
Yes, this is a change in Spark 2.0.  you can take a look at 
https://issues.apache.org/jira/browse/SPARK-13723 


In the latest Spark On Yarn documentation 
 for Spark 2.0, there 
is updated description for --num-executors:
> spark.executor.instances  2   The number of executors for static 
> allocation. Withspark.dynamicAllocation.enabled, the initial set of executors 
> will be at least this large.
You can disable the dynamic allocation for an application by specifying “--conf 
spark.dynamicAllocation.enabled=false” in the command line.

> On Jul 28, 2016, at 15:44, LONG WANG  wrote:
> 
> Hi Spark Experts,
> 
>   Today I tried Spark 2.0 on YARN and also enabled 
> Dynamic Resource Allocation feature, I just find that no matter I specify 
> --num-executor in spark-submit command or not, the Dynamic Resource 
> Allocation is used, but I remember when I specify --num-executor option in 
> spark-submit command in Spark 1.6, the Dynamic Resource Allocation feature 
> will not be used/effect for that job. And I can see below log in Spark 1.6 .
> 
> <截图1.png>
>
>   Is this a behavior change in Spark 2.0? And How can 
> I disable Dynamic Resource Allocation for a specific job submission 
> temporarily as before? 
> 
> 
>  
>  邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
> 共有 1 个附件
> 截图1.png(23K)
> 极速下载 
> 
>  在线预览 
> 


Re: Spark 2.0 SparkSession, SparkConf, SparkContext

2016-07-27 Thread Sun Rui
If you want to keep using RDD API, then you still need to create SparkContext 
first.

If you want to use just Dataset/DataFrame/SQL API, then you can directly create 
a SparkSession. Generally the SparkContext is hidden although it is internally 
created and held within the SparkSession. Anytime you need the SparkContext, 
you can get it from SparkSession.sparkContext.   while SparkConf is accepted 
when creating a SparkSession, the formal way to set/get configurations for a 
SparkSession is through SparkSession.conf.set()/get()
> On Jul 27, 2016, at 21:02, Jestin Ma  wrote:
> 
> I know that Sparksession is replacing the SQL and HiveContexts, but what 
> about SparkConf and SparkContext? Are those still relevant in our programs?
> 
> Thank you!
> Jestin



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0 SparkSession, SparkConf, SparkContext

2016-07-27 Thread Sun Rui
If you want to keep using RDD API, then you still need to create SparkContext 
first.

If you want to use just Dataset/DataFrame/SQL API, then you can directly create 
a SparkSession. Generally the SparkContext is hidden although it is internally 
created and held within the SparkSession. Anytime you need the SparkContext, 
you can get it from SparkSession.sparkContext.   while SparkConf is accepted 
when creating a SparkSession, the formal way to set/get configurations for a 
SparkSession is through SparkSession.conf.set()/get()
> On Jul 27, 2016, at 21:02, Jestin Ma  wrote:
> 
> I know that Sparksession is replacing the SQL and HiveContexts, but what 
> about SparkConf and SparkContext? Are those still relevant in our programs?
> 
> Thank you!
> Jestin



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using flatMap on Dataframes with Spark 2.0

2016-07-23 Thread Sun Rui
You should use :
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val df = spark.read.parquet(fileName)

implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema)

val df1 = df.flatMap { x => List(x) }
> On Jul 23, 2016, at 22:01, Julien Nauroy <julien.nau...@u-psud.fr> wrote:
> 
> Thanks for your quick reply.
> 
> I've tried with this encoder:
> implicit def RowEncoder: org.apache.spark.sql.Encoder[Row] = 
> org.apache.spark.sql.Encoders.kryo[Row]
> Using a suggestion from 
> http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6
>  
> <http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6>
> 
> How did you setup your encoder?
> 
> 
> De: "Sun Rui" <sunrise_...@163.com>
> À: "Julien Nauroy" <julien.nau...@u-psud.fr>
> Cc: user@spark.apache.org
> Envoyé: Samedi 23 Juillet 2016 15:55:21
> Objet: Re: Using flatMap on Dataframes with Spark 2.0
> 
> I did a try. the schema after flatMap is the same, which is expected.
> 
> What’s your Row encoder?
> On Jul 23, 2016, at 20:36, Julien Nauroy <julien.nau...@u-psud.fr 
> <mailto:julien.nau...@u-psud.fr>> wrote:
> 
> Hi,
> 
> I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5).
> The code is the following:
> var data = spark.read.parquet(fileName).flatMap(x => List(x))
> 
> Of course it's an overly simplified example, but the result is the same.
> The dataframe schema goes from this:
> root
> |-- field1: double (nullable = true)
> |-- field2: integer (nullable = true)
> (etc)
> 
> to this:
> root
> |-- value: binary (nullable = true)
> 
> Plus I have to provide an encoder for Row.
> I expect to get the same schema after calling flatMap.
> Any idea what I could be doing wrong?
> 
> 
> Best regards,
> Julien
> 
> 
> 
> 



Re: Using flatMap on Dataframes with Spark 2.0

2016-07-23 Thread Sun Rui
I did a try. the schema after flatMap is the same, which is expected.

What’s your Row encoder?
> On Jul 23, 2016, at 20:36, Julien Nauroy  wrote:
> 
> Hi,
> 
> I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5).
> The code is the following:
> var data = spark.read.parquet(fileName).flatMap(x => List(x))
> 
> Of course it's an overly simplified example, but the result is the same.
> The dataframe schema goes from this:
> root
> |-- field1: double (nullable = true)
> |-- field2: integer (nullable = true)
> (etc)
> 
> to this:
> root
> |-- value: binary (nullable = true)
> 
> Plus I have to provide an encoder for Row.
> I expect to get the same schema after calling flatMap.
> Any idea what I could be doing wrong?
> 
> 
> Best regards,
> Julien
> 
> 



Re: How to convert from DataFrame to Dataset[Row]?

2016-07-16 Thread Sun Rui
For Spark 1.6.x, a DataFrame can't be directly converted to a Dataset[Row], but 
can done indirectly as follows:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
// assume df is a DataFrame
implicit val encoder: ExpressionEncoder[Row]  = RowEncoder(df.schema)
val ds = df.as[Row]

However, it may be more convenient to convert a DataFrame to a Dataset of Tuple 
or case class corresponding to the row schema. 

> On Jul 16, 2016, at 03:21, Daniel Barclay  wrote:
> 
> In Spark 1.6.1, how can I convert a DataFrame to a Dataset[Row]?
> 
> Is there a direct conversion?  (Trying .as[Row] doesn't work,
> even after importing  .implicits._ .)
> 
> Is there some way to map the Rows from the Dataframe into the Dataset[Row]?
> (DataFrame.map would just make another Dataframe, right?)
> 
> 
> Thanks,
> Daniel
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: Saving data frames on Spark Master/Driver

2016-07-14 Thread Sun Rui
You can simply save the join result distributedly, for example, as a HDFS file, 
and then copy the HDFS file to a local file.

There is an alternative memory-efficient way to collect distributed data back 
to driver other than collect(), that is toLocalIterator. The iterator will 
consume as much memory as the largest partition in your dataset.

You can use DataFrame.rdd.toLocalIterator() with Spark versions prior to 2.0. 
You can use Dataset.toLocalIterator() with Spark 2.0. 

For details, refer to https://issues.apache.org/jira/browse/SPARK-14334 


> On Jul 15, 2016, at 09:05, Pedro Rodriguez  wrote:
> 
> Out of curiosity, is there a way to pull all the data back to the driver to 
> save without collect()? That is, stream the data in chunks back to the driver 
> so that maximum memory used comparable to a single node’s data, but all the 
> data is saved on one node.
> 
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
> 
> pedrorodriguez.io  | 909-353-4423
> github.com/EntilZha  | LinkedIn 
> 
> On July 14, 2016 at 6:02:12 PM, Jacek Laskowski (ja...@japila.pl 
> ) wrote:
> 
>> Hi, 
>> 
>> Please re-consider your wish since it is going to move all the 
>> distributed dataset to the single machine of the driver and may lead 
>> to OOME. It's more pro to save your result to HDFS or S3 or any other 
>> distributed filesystem (that is accessible by the driver and 
>> executors). 
>> 
>> If you insist... 
>> 
>> Use collect() after select() and work with Array[T]. 
>> 
>> Pozdrawiam, 
>> Jacek Laskowski 
>>  
>> https://medium.com/@jaceklaskowski/ 
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark 
>> Follow me at https://twitter.com/jaceklaskowski 
>> 
>> 
>> On Fri, Jul 15, 2016 at 12:15 AM, vr.n. nachiappan 
>>  wrote: 
>> > Hello, 
>> > 
>> > I am using data frames to join two cassandra tables. 
>> > 
>> > Currently when i invoke save on data frames as shown below it is saving 
>> > the 
>> > join results on executor nodes. 
>> > 
>> > joineddataframe.select(,  
>> > ...).format("com.databricks.spark.csv").option("header", 
>> > "true").save() 
>> > 
>> > I would like to persist the results of the join on Spark Master/Driver 
>> > node. 
>> > Is it possible to save the results on Spark Master/Driver and how to do 
>> > it. 
>> > 
>> > I appreciate your help. 
>> > 
>> > Nachi 
>> > 
>> 
>> - 
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Issue in spark job. Remote rpc client dissociated

2016-07-14 Thread Sun Rui
Where is argsList defined? is Launcher.main() thread-safe? Note that if 
multiple folders are processed in a node, multiple threads may concurrently run 
in the executor, each processing a folder.

> On Jul 14, 2016, at 12:28, Balachandar R.A.  wrote:
> 
> Hello Ted, 
> 
> 
> Thanks for the response. Here is the additional information.
>  
> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>  
> Here is the code snippet
>  
>  
> JavaRDD add = jsc.parallelize(listFolders, listFolders.size());
> JavaRDD test = add.map(new Function() {
> @Override
> public Integer call(File file) throws Exception {
> String folder = file.getName();
> System.out.println("[x] Processing dataset from the 
> directory " + folder);
> int status = 0;
>argsList[3] = argsList[3] + "/"+ folder;   // full path of 
> the input folder. Input folder is in shared file system that every worker 
> node has access to it. Something like (“/home/user/software/data/”) and 
> folder name will be like (“20161307”)
> argsList[7] = argsList[7] + "/" + folder + ".csv"; // 
> full path of the output.
> try{
> Launcher.main(argsList);  // Launcher class is a 
> black box. It process the input folder and create a csv file which in the 
> output location (argsList[7]). This is also in a shared file system
> status = 0;
> }
> catch(Exception e)
> {
> System.out.println("[x] Execution of import tool for 
> the directory " + folder + "failed");
> status = 0;
> }
> accum.add(1);
> return status;
> }
> });
>  
>  
> Here is the spark-env.sh
>  
> export SPARK_WORKER_INSTANCES=1
> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>  
> Here is the spark-defaults.conf
>  
>  
>   spark.master spark:// master:7077
>   spark.eventLog.enabled   true
>   spark.eventLog.dir   hdfs://master:9000/sparkEvent
>   spark.serializer org.apache.spark.serializer.KryoSerializer
>   spark.driver.memory  4g
>  
> 
> 
> Hope it helps. 



Re: Enforcing shuffle hash join

2016-07-04 Thread Sun Rui
You can try set “spark.sql.join.preferSortMergeJoin” cons option to false.

For detailed join strategies, take a look at the source code of 
SparkStrategies.scala:
/**
 * Select the proper physical plan for join based on joining keys and size of 
logical plan.
 *
 * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at 
least some of the
 * predicates can be evaluated by matching join keys. If found,  Join 
implementations are chosen
 * with the following precedence:
 *
 * - Broadcast: if one side of the join has an estimated physical size that is 
smaller than the
 * user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold
 * or if that side has an explicit broadcast hint (e.g. the user applied the
 * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), 
then that side
 * of the join will be broadcasted and the other side will be streamed, 
with no shuffling
 * performed. If both sides of the join are eligible to be broadcasted then 
the
 * - Shuffle hash join: if the average size of a single partition is small 
enough to build a hash
 * table.
 * - Sort merge: if the matching join keys are sortable.
 *
 * If there is no joining keys, Join implementations are chosen with the 
following precedence:
 * - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
 * - CartesianProduct: for Inner join
 * - BroadcastNestedLoopJoin
 */


> On Jul 5, 2016, at 13:28, Lalitha MV  wrote:
> 
> It picks sort merge join, when spark.sql.autoBroadcastJoinThreshold is set to 
> -1, or when the size of the small table is more than 
> spark.sql.spark.sql.autoBroadcastJoinThreshold.
> 
> On Mon, Jul 4, 2016 at 10:17 PM, Takeshi Yamamuro  > wrote:
> The join selection can be described in 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L92
>  
> .
> If you have join keys, you can set -1 at 
> `spark.sql.autoBroadcastJoinThreshold` to disable broadcast joins. Then, hash 
> joins are used in queries.
> 
> // maropu 
> 
> On Tue, Jul 5, 2016 at 4:23 AM, Lalitha MV  > wrote:
> Hi maropu, 
> 
> Thanks for your reply. 
> 
> Would it be possible to write a rule for this, to make it always pick shuffle 
> hash join, over other join implementations(i.e. sort merge and broadcast)? 
> 
> Is there any documentation demonstrating rule based transformation for 
> physical plan trees? 
> 
> Thanks,
> Lalitha
> 
> On Sat, Jul 2, 2016 at 12:58 AM, Takeshi Yamamuro  > wrote:
> Hi,
> 
> No, spark has no hint for the hash join.
> 
> // maropu
> 
> On Fri, Jul 1, 2016 at 4:56 PM, Lalitha MV  > wrote:
> Hi, 
> 
> In order to force broadcast hash join, we can set the 
> spark.sql.autoBroadcastJoinThreshold config. Is there a way to enforce 
> shuffle hash join in spark sql? 
> 
> 
> Thanks,
> Lalitha
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro
> 
> 
> 
> -- 
> Regards,
> Lalitha
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro
> 
> 
> 
> -- 
> Regards,
> Lalitha



Re: One map per folder in spark or Hadoop

2016-06-30 Thread Sun Rui
Say you have got all of your folder paths into a val folders: Seq[String]

val add = sc.parallelize(folders, folders.size).mapPartitions { iter =>
  val folder = iter.next
  val status: Int = 
  Seq(status).toIterator
}

> On Jun 30, 2016, at 16:42, Balachandar R.A.  wrote:
> 
> Hello,
> 
> I have some 100 folders. Each folder contains 5 files. I have an executable 
> that process one folder. The executable is a black box and hence it cannot be 
> modified.I would like to process 100 folders in parallel using Apache spark 
> so that I should be able to span a map task per folder. Can anyone give me an 
> idea? I have came across similar questions but with Hadoop and answer was to 
> use combineFileInputFormat and pathFilter. However, as I said, I want to use 
> Apache spark. Any idea?
> 
> Regards 
> Bala
> 



Re: Using R code as part of a Spark Application

2016-06-30 Thread Sun Rui
I would guess that the technology behind Azure R Server is about Revolution 
Enterprise DistributedR/ScaleR. I don’t know the details, but the statement in 
the “Step 6. Install R packages” section in the given documentation page.
However, if you need to install R packages on the worker nodes of the 
cluster, you must use a Script Action.

That implies that R should be installed on each worker node.

> On Jun 30, 2016, at 05:53, John Aherne  > wrote:
> 
> I don't think R server requires R on the executor nodes. I originally set up 
> a SparkR cluster for our Data Scientist on Azure which required that I 
> install R on each node, but for the R Server set up, there is an extra edge 
> node with R server that they connect to. From what little research I was able 
> to do, it seems that there are some special functions in R Server that can 
> distribute the work to the cluster. 
> 
> Documentation is light, and hard to find but I found this helpful:
> https://blogs.msdn.microsoft.com/uk_faculty_connection/2016/05/10/r-server-for-hdinsight-running-on-microsoft-azure-cloud-data-science-challenges/
>  
> 
> 
> 
> 
> On Wed, Jun 29, 2016 at 3:29 PM, Sean Owen  > wrote:
> Oh, interesting: does this really mean the return of distributing R
> code from driver to executors and running it remotely, or do I
> misunderstand? this would require having R on the executor nodes like
> it used to?
> 
> On Wed, Jun 29, 2016 at 5:53 PM, Xinh Huynh  > wrote:
> > There is some new SparkR functionality coming in Spark 2.0, such as
> > "dapply". You could use SparkR to load a Parquet file and then run "dapply"
> > to apply a function to each partition of a DataFrame.
> >
> > Info about loading Parquet file:
> > http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/sparkr.html#from-data-sources
> >  
> > 
> >
> > API doc for "dapply":
> > http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/api/R/index.html
> >  
> > 
> >
> > Xinh
> >
> > On Wed, Jun 29, 2016 at 6:54 AM, sujeet jog  > > wrote:
> >>
> >> try Spark pipeRDD's , you can invoke the R script from pipe , push  the
> >> stuff you want to do on the Rscript stdin,  p
> >>
> >>
> >> On Wed, Jun 29, 2016 at 7:10 PM, Gilad Landau  >> >
> >> wrote:
> >>>
> >>> Hello,
> >>>
> >>>
> >>>
> >>> I want to use R code as part of spark application (the same way I would
> >>> do with Scala/Python).  I want to be able to run an R syntax as a map
> >>> function on a big Spark dataframe loaded from a parquet file.
> >>>
> >>> Is this even possible or the only way to use R is as part of RStudio
> >>> orchestration of our Spark  cluster?
> >>>
> >>>
> >>>
> >>> Thanks for the help!
> >>>
> >>>
> >>>
> >>> Gilad
> >>>
> >>>
> >>
> >>
> >
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> John Aherne
> Big Data and SQL Developer
> 
> 
> Cell:
> Email:
> Skype:
> Web:
> 
> +1 (303) 809-9718
> john.ahe...@justenough.com 
> john.aherne.je 
> www.justenough.com 
> 
> Confidentiality Note: The information contained in this email and document(s) 
> attached are for the exclusive use of the addressee and may contain 
> confidential, privileged and non-disclosable information. If the recipient of 
> this email is not the addressee, such recipient is strictly prohibited from 
> reading, photocopying, distribution or otherwise using this email or its 
> contents in any way.
> 



Re: Using R code as part of a Spark Application

2016-06-29 Thread Sun Rui
Hi, Gilad,

You can try the dapply() and gapply() function in SparkR in Spark 2.0. Yes, it 
is required that R installed on each worker node.

However, if your Spark application is Scala/Java based, it is not supported for 
now to run R code in DataFrames. There is closed lira 
https://issues.apache.org/jira/browse/SPARK-14746 which remains discussion 
purpose. You have to convert DataFrames to RDDs, and use pipe() on RDDs to 
launch external R processes and run R code.

> On Jun 30, 2016, at 07:08, Xinh Huynh  wrote:
> 
> It looks like it. "DataFrame UDFs in R" is resolved in Spark 2.0: 
> https://issues.apache.org/jira/browse/SPARK-6817 
> 
> 
> Here's some of the code:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
>  
> 
> 
> /**
>  * A function wrapper that applies the given R function to each partition.
>  */
> private[sql] case class MapPartitionsRWrapper(
> func: Array[Byte],
> packageNames: Array[Byte],
> broadcastVars: Array[Broadcast[Object]],
> inputSchema: StructType,
> outputSchema: StructType) extends (Iterator[Any] => Iterator[Any]) 
> 
> Xinh
> 
> On Wed, Jun 29, 2016 at 2:59 PM, Sean Owen  > wrote:
> Here we (or certainly I) am not talking about R Server, but plain vanilla R, 
> as used with Spark and SparkR. Currently, SparkR doesn't distribute R code at 
> all (it used to, sort of), so I'm wondering if that is changing back.
> 
> On Wed, Jun 29, 2016 at 10:53 PM, John Aherne  > wrote:
> I don't think R server requires R on the executor nodes. I originally set up 
> a SparkR cluster for our Data Scientist on Azure which required that I 
> install R on each node, but for the R Server set up, there is an extra edge 
> node with R server that they connect to. From what little research I was able 
> to do, it seems that there are some special functions in R Server that can 
> distribute the work to the cluster. 
> 
> Documentation is light, and hard to find but I found this helpful:
> https://blogs.msdn.microsoft.com/uk_faculty_connection/2016/05/10/r-server-for-hdinsight-running-on-microsoft-azure-cloud-data-science-challenges/
>  
> 
> 
> 
> 
> On Wed, Jun 29, 2016 at 3:29 PM, Sean Owen  > wrote:
> Oh, interesting: does this really mean the return of distributing R
> code from driver to executors and running it remotely, or do I
> misunderstand? this would require having R on the executor nodes like
> it used to?
> 
> On Wed, Jun 29, 2016 at 5:53 PM, Xinh Huynh  > wrote:
> > There is some new SparkR functionality coming in Spark 2.0, such as
> > "dapply". You could use SparkR to load a Parquet file and then run "dapply"
> > to apply a function to each partition of a DataFrame.
> >
> > Info about loading Parquet file:
> > http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/sparkr.html#from-data-sources
> >  
> > 
> >
> > API doc for "dapply":
> > http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/api/R/index.html
> >  
> > 
> >
> > Xinh
> >
> > On Wed, Jun 29, 2016 at 6:54 AM, sujeet jog  > > wrote:
> >>
> >> try Spark pipeRDD's , you can invoke the R script from pipe , push  the
> >> stuff you want to do on the Rscript stdin,  p
> >>
> >>
> >> On Wed, Jun 29, 2016 at 7:10 PM, Gilad Landau  >> >
> >> wrote:
> >>>
> >>> Hello,
> >>>
> >>>
> >>>
> >>> I want to use R code as part of spark application (the same way I would
> >>> do with Scala/Python).  I want to be able to run an R syntax as a map
> >>> function on a big Spark dataframe loaded from a parquet file.
> >>>
> >>> Is this even possible or the only way to use R is as part of RStudio
> >>> orchestration of our Spark  cluster?
> >>>
> >>>
> >>>
> >>> Thanks for the help!
> >>>
> >>>
> >>>
> >>> Gilad
> >>>
> >>>
> >>
> >>
> >
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> John Aherne
> Big Data and SQL Developer
> 
> 
> Cell:
> Email:
> Skype:
> Web:
> 
> +1 (303) 809-9718 

Re: sparkR.init() can not load sparkPackages.

2016-06-19 Thread Sun Rui
Hi, Joseph,

This is a known issue but not a bug.

This issue does not occur when you use interactive SparkR session, while it 
does occur when you execute an R file.

The reason behind this is that in case you execute an R file, the R backend 
launches before the R interpreter, so there is no opportunity for packages 
specified with ‘sparkPackages’ to be processed.

For now, if you want to execute an R file with additional spark packages, 
please use the “--packages” command line option.

> On Jun 17, 2016, at 10:46, Joseph  wrote:
> 
> Hi all,
> 
> I find an issue in sparkR, maybe it's a bug:
> 
> When I read csv file, it's normal to use the following way:
> ${SPARK_HOME}/bin/spark-submit  --packages 
> com.databricks:spark-csv_2.11:1.4.0   example.R 
> 
> But using the following way will give an error:
> sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.4.0")
> 
> 16/06/17 09:54:12 ERROR RBackendHandler: loadDF on 
> org.apache.spark.sql.api.r.SQLUtils failed
> Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
>   java.lang.ClassNotFoundException: Failed to find data source: csv. Please 
> find packages at http://spark-packages.org 
>   at 
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:77)
> 
> It is obvious that the sparkR.init() does not load the specified package!
> -
> 
> Appendix:
> The complete code for example.R:
> 
> if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
>   Sys.setenv(SPARK_HOME = "/home/hadoop/spark-1.6.1-bin-hadoop2.6")
> }
> 
> library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
> 
> sc <- sparkR.init(master = "local[2]", sparkEnvir = 
> list(spark.driver.memory="1g"), 
> sparkPackages="com.databricks:spark-csv_2.11:1.4.0")
> 
> sqlContext <- sparkRSQL.init(sc)
> people <- read.df(sqlContext, 
> "file:/home/hadoop/spark-1.6.1-bin-hadoop2.6/data/mllib/sample_tree_data.csv",
>  "csv")
> registerTempTable(people, "people")
> teenagers <- sql(sqlContext, "SELECT * FROM people")
> head(teenagers)
> 
> Joseph



Re: Unable to execute sparkr jobs through Chronos

2016-06-16 Thread Sun Rui
I saw in the job definition an Env Var: SPARKR_MASTER. What is that for? I 
don’t think SparkR uses it.
> On Jun 17, 2016, at 10:08, Sun Rui <sunrise_...@163.com> wrote:
> 
> It seems that spark master URL is not correct. What is it?
>> On Jun 16, 2016, at 18:57, Rodrick Brown <rodr...@orchard-app.com 
>> <mailto:rodr...@orchard-app.com>> wrote:
>> 
>> Master must start with yarn, spark, mesos, or local
> 



Re: Unable to execute sparkr jobs through Chronos

2016-06-16 Thread Sun Rui
It seems that spark master URL is not correct. What is it?
> On Jun 16, 2016, at 18:57, Rodrick Brown  wrote:
> 
> Master must start with yarn, spark, mesos, or local



Re: Adding h5 files in a zip to use with PySpark

2016-06-15 Thread Sun Rui
have you tried
--files ?
> On Jun 15, 2016, at 18:50, ar7  wrote:
> 
> I am using PySpark 1.6.1 for my spark application. I have additional modules
> which I am loading using the argument --py-files. I also have a h5 file
> which I need to access from one of the modules for initializing the
> ApolloNet.
> 
> Is there any way I could access those files from the modules if I put them
> in the same archive? I tried this approach but it was throwing an error
> because the files are not there in every worker. I can think of one solution
> which is copying the file to each of the workers but I want to know if there
> are better ways to do it?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Adding-h5-files-in-a-zip-to-use-with-PySpark-tp27173.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR : glm model

2016-06-11 Thread Sun Rui
You were looking at some old code.
poisson family is supported in latest master branch.
You can try spark 2.0 preview release from 
http://spark.apache.org/news/spark-2.0.0-preview.html 


> On Jun 10, 2016, at 12:14, april_ZMQ  wrote:
> 
> Hi all,
> 
> I'm a student who are working on a data analysis project with sparkR.
> 
> I found out that GLM (generalized linear model) only supports two types of
> distribution,  "gaussian" and  "binomial". 
> However, our project is requiring the "poisson" distribution. Meanwhile, I
> found out that sparkR was supporting "poisson"before but now this function
> is closed. https://issues.apache.org/jira/browse/SPARK-12566
>   
> 
> Is there any approaches that I can use the previous official package of
> poisson distribution in SparkR instead?
> 
> Thank you very much!
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-glm-model-tp27134.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Re: Slow collecting of large Spark Data Frames into R

2016-06-11 Thread Sun Rui
Hi, Jonathan,

Thanks for reporting. This is a known issue that the community would like to 
address later.

Please refer to https://issues.apache.org/jira/browse/SPARK-14037. It would be 
better that you can profile your use case using the method  discussed in the 
JIRA issue and paste the metrics information into it? This would be helpful for 
addressing the issue.

> On Jun 11, 2016, at 08:31, Jonathan Mortensen  wrote:
> 
> 16BG



Re: SparkR interaction with R libraries (currently 1.5.2)

2016-06-07 Thread Sun Rui
Hi, Ian,
You should not use the Spark DataFrame a_df in your closure.
For an R function for lapplyPartition, the parameter is a list of lists, 
representing the rows in the corresponding partition.
In Spark 2.0, SparkR provides a new public API called dapply, which can apply 
an R function to each partition of a Spark DataFrame. The input of the R 
function is a data.frame corresponds to the partition data, and the output is 
also a data.frame.
you may download the Spark 2.0 preview release and give it a try.
> On Jun 8, 2016, at 01:58, rachmaninovquartet  
> wrote:
> 
> Hi,
> I'm trying to figure out how to work with R libraries in spark, properly.
> I've googled and done some trial and error. The main error, I've been
> running into is "cannot coerce class "structure("DataFrame", package =
> "SparkR")" to a data.frame". I'm wondering if there is a way to use the R
> dataframe functionality on worker nodes or if there is a way to "hack" the R
> function in order to make it accept Spark dataframes. Here is an example of
> what I'm trying to do, with a_df being a spark dataframe:
> 
> ***DISTRIBUTED***
> #0 filter out nulls
> a_df <- filter(a_df, isNotNull(a_df$Ozone))
> 
> #1 make closure
> treeParty <- function(x) {
># Use sparseMatrix function from the Matrix package
>air.ct <- ctree(Ozone ~ ., data = a_df)
> }
> 
> #2  put package in context
> SparkR:::includePackage(sc, partykit)
> 
> #3 apply to all partitions
> partied <- SparkR:::lapplyPartition(a_df, treeParty)
> 
> 
> **LOCAL***
> Here is R code that works with a local dataframe, local_df:
> local_df <- subset(airquality, !is.na(Ozone))
> air.ct <- ctree(Ozone ~ ., data = local_df)
> 
> Any advice would be greatly appreciated!
> 
> Thanks,
> 
> Ian
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-interaction-with-R-libraries-currently-1-5-2-tp27107.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: --driver-cores for Standalone and YARN only?! What about Mesos?

2016-06-02 Thread Sun Rui
yes, I think you can fire a JIRA issue for this.
But why removing the default value. Seems the default core is 1 according to 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala#L110


On Jun 2, 2016, at 05:18, Jacek Laskowski  wrote:


Hi,

I'm reviewing the code of spark-submit and can see that although
--driver-cores is said to be for Standalone and YARN only, it is
applicable for Mesos [1].

➜  spark git:(master) ✗ ./bin/spark-shell --help
Usage: ./bin/spark-shell [options]
...
Spark standalone with cluster deploy mode only:
 --driver-cores NUM  Cores for driver (Default: 1).
...
YARN-only:
 --driver-cores NUM  Number of cores used by the driver, only
in cluster mode
 (Default: 1).

I think Mesos has been overlooked (as it's not even included in the
--help). I also can't find that the default number of cores for the
driver for the option is 1.

I can see few things to fix:

1. Have --driver-cores in the "main" help with no separation for
standalone and YARN.
2. Add note that it works only for cluster deploy mode.
3. Remove (Default: 1)

Please confirm (or fix) my understanding before I file a JIRA issue. Thanks!

[1] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L475-L476

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





Re: get and append file name in record being reading

2016-06-02 Thread Sun Rui
You can use RDD.wholeTextFiles().

For example, suppose all your files are under /tmp/ABC_input/,

val rdd  = sc.wholeTextFiles("file:///tmp/ABC_input”)
val rdd1 = rdd.flatMap { case (path, content) => 
  val fileName = new java.io.File(path).getName
  content.split("\n").map { line => (line, fileName) }
}
val df = sqlContext.createDataFrame(rdd1).toDF("line", "file")
> On Jun 2, 2016, at 03:13, Vikash Kumar  wrote:
> 
> 100,abc,299
> 200,xyz,499



Re: Windows Rstudio to Linux spakR

2016-06-01 Thread Sun Rui
Selvam,

First, deploy the Spark distribution on your Windows machine, which is of the 
same version of Spark in your Linux cluster

Second, follow the instructions at 
https://github.com/apache/spark/tree/master/R#using-sparkr-from-rstudio. 
Specify the Spark master URL for your Linux Spark cluster when calling 
sparkR.init(). Don’t know your Spark cluster deployment mode. If it is YARN, 
you may have to copy YARN conf files from your cluster and set YARN_CONF_DIR 
environment variable to point to it.

These steps are my personal understanding, I have not tested in this scenario. 
Please report if you have any problem.

> On Jun 1, 2016, at 16:55, Selvam Raman  wrote:
> 
> Hi ,
> 
> How to connect to sparkR (which is available in Linux env) using 
> Rstudio(Windows env).
> 
> Please help me.
> 
> -- 
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"



Re: Can we use existing R model in Spark

2016-05-30 Thread Sun Rui
I mean train random forest model (not using R) and use it for prediction 
together using Spark ML.
> On May 30, 2016, at 20:15, Neha Mehta <nehamehta...@gmail.com> wrote:
> 
> Thanks Sujeet.. will try it out.
> 
> Hi Sun,
> 
> Can you please tell me what did you mean by "Maybe you can try using the 
> existing random forest model" ? did you mean creating the model again using 
> Spark MLLIB?
> 
> Thanks,
> Neha
> 
> 
> 
> 
> From: sujeet jog <sujeet@gmail.com <mailto:sujeet@gmail.com>>
> Date: Mon, May 30, 2016 at 4:52 PM
> Subject: Re: Can we use existing R model in Spark
> To: Sun Rui <sunrise_...@163.com <mailto:sunrise_...@163.com>>
> Cc: Neha Mehta <nehamehta...@gmail.com <mailto:nehamehta...@gmail.com>>, user 
> <user@spark.apache.org <mailto:user@spark.apache.org>>
> 
> 
> Try to invoke a R script from Spark using rdd pipe method , get the work done 
> & and receive the model back in RDD. 
> 
> 
> for ex :-
> .   rdd.pipe("")
> 
> 
> On Mon, May 30, 2016 at 3:57 PM, Sun Rui <sunrise_...@163.com 
> <mailto:sunrise_...@163.com>> wrote:
> Unfortunately no. Spark does not support loading external modes (for 
> examples, PMML) for now.
> Maybe you can try using the existing random forest model in Spark.
> 
>> On May 30, 2016, at 18:21, Neha Mehta <nehamehta...@gmail.com 
>> <mailto:nehamehta...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I have an existing random forest model created using R. I want to use that 
>> to predict values on Spark. Is it possible to do the same? If yes, then how?
>> 
>> Thanks & Regards,
>> Neha
>> 
> 
> 
> 
> 



Re: Can we use existing R model in Spark

2016-05-30 Thread Sun Rui
Unfortunately no. Spark does not support loading external modes (for examples, 
PMML) for now.
Maybe you can try using the existing random forest model in Spark.

> On May 30, 2016, at 18:21, Neha Mehta  wrote:
> 
> Hi,
> 
> I have an existing random forest model created using R. I want to use that to 
> predict values on Spark. Is it possible to do the same? If yes, then how?
> 
> Thanks & Regards,
> Neha
> 



Re: Splitting RDD by partition

2016-05-20 Thread Sun Rui
I think the latter approach is better, which can avoid un-necessary 
computations by filtering out un-needed partitions.
It is better to cache the previous RDD so that it won’t be computed twice
> On May 20, 2016, at 16:59, shlomi  wrote:
> 
> Another approach I found:
> 
> First, I make a PartitionsRDD class which only takes a certain range of
> partitions
> - 
> case class PartitionsRDDPartition(val index:Int, val origSplit:Partition)
> extends Partition {}
> 
> class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int)
> extends RDD[U](prev) {
>  override def getPartitions: Array[Partition] =
> prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split,
> idx)=>{new PartitionsRDDPartition(idx,
> split)}}.asInstanceOf[Array[Partition]]
>  override def compute(split: Partition, context: TaskContext): Iterator[U]
> =
> prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit,
> context)
> }
> - 
> 
> And then I can create my two RDD's using the following:
> - 
> def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
> RDD[T]) = {
>   val left  = new PartitionsRDD[T](rdd, 0, hotPartitions);
>   val right = new PartitionsRDD[T](rdd, hotPartitions,
> rdd.numPartitions-hotPartitions);
>   (left, right)
> }
> - 
> 
> This approach saves a few minutes when compared to the one in the previous
> post (at least on a local test.. I still need to test this on a real
> cluster).
> 
> Any thought about this? Is this the right thing to do or am I missing
> something important?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does spark support Apache Arrow

2016-05-19 Thread Sun Rui
1. I don’t think so
2. Arrow is for in-memory columnar execution. While cache is for in-memory 
columnar storage
> On May 20, 2016, at 10:16, Todd  wrote:
> 
> From the official site http://arrow.apache.org/, Apache Arrow is used for 
> Columnar In-Memory storage. I have two quick questions:
> 1. Does spark support Apache Arrow?
> 2. When dataframe is cached in memory, the data are saved in columnar 
> in-memory style. What is the relationship between this feature and Apache 
> Arrow,that is,
> when the data is in Apache Arrow format,does spark still need the effort to 
> cache the dataframe in columnar in-memory?



Re: Tar File: On Spark

2016-05-19 Thread Sun Rui
Sure. You can try pySpark, which is the Python API of Spark.
> On May 20, 2016, at 06:20, ayan guha <guha.a...@gmail.com> wrote:
> 
> Hi
> 
> Thanks for the input. Can it be possible to write it in python? I think I can 
> use FileUti.untar from hdfs jar. But can I do it from python?
> 
> On 19 May 2016 16:57, "Sun Rui" <sunrise_...@163.com 
> <mailto:sunrise_...@163.com>> wrote:
> 1. create a temp dir on HDFS, say “/tmp”
> 2. write a script to create in the temp dir one file for each tar file. Each 
> file has only one line:
> 
> 3. Write a spark application. It is like:
>   val rdd = sc.textFile ()
>   rdd.map { line =>
>construct an untar command using the path information in “line” and 
> launches the command
>   }
> 
> > On May 19, 2016, at 14:42, ayan guha <guha.a...@gmail.com 
> > <mailto:guha.a...@gmail.com>> wrote:
> >
> > Hi
> >
> > I have few tar files in HDFS in a single folder. each file has multiple 
> > files in it.
> >
> > tar1:
> >   - f1.txt
> >   - f2.txt
> > tar2:
> >   - f1.txt
> >   - f2.txt
> >
> > (each tar file will have exact same number of files, same name)
> >
> > I am trying to find a way (spark or pig) to extract them to their own 
> > folders.
> >
> > f1
> >   - tar1_f1.txt
> >   - tar2_f1.txt
> > f2:
> >- tar1_f2.txt
> >- tar1_f2.txt
> >
> > Any help?
> >
> >
> >
> > --
> > Best Regards,
> > Ayan Guha
> 
> 



Re: dataframe stat corr for multiple columns

2016-05-19 Thread Sun Rui
There is an existing JIRA issue for it: 
https://issues.apache.org/jira/browse/SPARK-11057 

Also there is an PR. Maybe we should help to review and merge it with a higher 
priority.
> On May 20, 2016, at 00:09, Xiangrui Meng  wrote:
> 
> This is nice to have. Please create a JIRA for it. Right now, you can merge 
> all columns into a vector column using RFormula or VectorAssembler, then 
> convert it into an RDD and call corr from MLlib.
> 
> 
> On Tue, May 17, 2016, 7:09 AM Ankur Jain  > wrote:
> Hello Team,
> 
>  
> 
> In my current usecase I am loading data from CSV using spark-csv and trying 
> to correlate all variables.
> 
>  
> 
> As of now if we want to correlate 2 column in a dataframe df.stat.corr works 
> great but if we want to correlate multiple columns this won’t work.
> 
> In case of R we can use corrplot and correlate all numeric columns in a 
> single line of code. Can you guide me how to achieve the same with dataframe 
> or sql?
> 
>  
> 
> There seems a way in spark-mllib
> 
> http://spark.apache.org/docs/latest/mllib-statistics.html 
> 
>  
> 
> 
> 
>  
> 
> But it seems that it don’t take input as dataframe…
> 
>  
> 
> Regards,
> 
> Ankur
> 
> Information transmitted by this e-mail is proprietary to YASH Technologies 
> and/ or its Customers and is intended for use only by the individual or 
> entity to which it is addressed, and may contain information that is 
> privileged, confidential or exempt from disclosure under applicable law. If 
> you are not the intended recipient or it appears that this mail has been 
> forwarded to you without proper authority, you are notified that any use or 
> dissemination of this information in any manner is strictly prohibited. In 
> such cases, please notify us immediately at i...@yash.com 
>  and delete this mail from your records.
>  邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
> 共有 2 个附件
> image001.png(10K)
> 极速下载 
> 
>  在线预览 
> 
> image001.png(10K)
> 极速下载 
> 
>  在线预览 
> 



Re: Tar File: On Spark

2016-05-19 Thread Sun Rui
1. create a temp dir on HDFS, say “/tmp”
2. write a script to create in the temp dir one file for each tar file. Each 
file has only one line:

3. Write a spark application. It is like:
  val rdd = sc.textFile ()
  rdd.map { line =>
   construct an untar command using the path information in “line” and 
launches the command
  }

> On May 19, 2016, at 14:42, ayan guha  wrote:
> 
> Hi
> 
> I have few tar files in HDFS in a single folder. each file has multiple files 
> in it. 
> 
> tar1:
>   - f1.txt
>   - f2.txt
> tar2:
>   - f1.txt
>   - f2.txt
> 
> (each tar file will have exact same number of files, same name)
> 
> I am trying to find a way (spark or pig) to extract them to their own 
> folders. 
> 
> f1
>   - tar1_f1.txt
>   - tar2_f1.txt
> f2:
>- tar1_f2.txt
>- tar1_f2.txt
> 
> Any help? 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR query

2016-05-17 Thread Sun Rui
I guess that you are using an old version of Spark, 1.4.

please try Spark version 1.5+

> On May 17, 2016, at 18:42, Mike Lewis <mle...@nephilaadvisors.co.uk> wrote:
> 
> Thanks, I’m just using RStudio. Running locally is fine, just issue with 
> having cluster in Linux and workers looking for Windows path,
> Which must be being passed through by the driver I guess. I checked the 
> spark-env.sh on each node and the appropriate SPARK_HOME is set
> correctly….
>  
>  
> From: Sun Rui [mailto:sunrise_...@163.com] 
> Sent: 17 May 2016 11:32
> To: Mike Lewis
> Cc: user@spark.apache.org
> Subject: Re: SparkR query
>  
> Lewis,
> 1. Could you check the values of “SPARK_HOME” environment on all of your 
> worker nodes?
> 2. How did you start your SparkR shell?
>  
> On May 17, 2016, at 18:07, Mike Lewis <mle...@nephilaadvisors.co.uk 
> <mailto:mle...@nephilaadvisors.co.uk>> wrote:
>  
> Hi,
>  
> I have a SparkR driver process that connects to a master running on Linux,
> I’ve tried to do a simple test, e.g.
>  
> sc <- sparkR.init(master="spark://my-linux-host.dev.local:7077 
> ",  
> sparkEnvir=list(spark.cores.max="4"))
> x <- SparkR:::parallelize(sc,1:100,2)
> y <- count(x)
>  
> But I can see that the worker nodes are failing, they are looking for the 
> Windows (rather than linux path) to
> Daemon.R
>  
> 16/05/17 06:52:33 INFO BufferedStreamThread: Fatal error: cannot open file 
> 'E:/Dev/spark/spark-1.4.0-bin-hadoop2.6/R/lib/SparkR/worker/daemon.R': No 
> such file or directory
>  
> Is this a configuration setting that I’m missing, the worker nodes (linux) 
> shouldn’t be looking in the spark home of the driver (windows) ?
> If so, I’d appreciate someone letting me know what I need to change/set.
> 
> Thanks,
> Mike Lewis
>  
> 
> --
>  
> This email has been sent to you on behalf of Nephila Advisors LLC 
> (“Advisors”). Advisors provides consultancy services to Nephila Capital Ltd. 
> (“Capital”), an investment advisor managed and carrying on business in 
> Bermuda. Advisors and its employees do not act as agents for Capital or the 
> funds it advises and do not have the authority to bind Capital or such funds 
> to any transaction or agreement. 
> 
> The information in this e-mail, and any attachment therein, is confidential 
> and for use by the addressee only. Any use, disclosure, reproduction, 
> modification or distribution of the contents of this e-mail, or any part 
> thereof, other than by the intended recipient, is strictly prohibited. If you 
> are not the intended recipient, please return the e-mail to the sender and 
> delete it from your computer. This email is for information purposes only, 
> nothing contained herein constitutes an offer to sell or buy securities, as 
> such an offer may only be made from a properly authorized offering document. 
> Although Nephila attempts to sweep e-mail and attachments for viruses, it 
> does not guarantee that either are virus-free and accepts no liability for 
> any damage sustained as a result of viruses. 
> --
>  
> 
> --
>  
> This email has been sent to you on behalf of Nephila Advisors UK (“Advisors 
> UK”). Advisors UK provides consultancy services to Nephila Capital Ltd. 
> (“Capital”), an investment advisor managed and carrying on business in 
> Bermuda. Advisors UK and its employees do not act as agents for Capital or 
> the funds it advises and do not have the authority to bind Capital or such 
> funds to any transaction or agreement. 
> 
> The information in this e-mail, and any attachment therein, is confidential 
> and for use by the addressee only. Any use, disclosure, reproduction, 
> modification or distribution of the contents of this e-mail, or any part 
> thereof, other than by the intended recipient, is strictly prohibited. If you 
> are not the intended recipient, please return the e-mail to the sender and 
> delete it from your computer. This email is for information purposes only, 
> nothing contained herein constitutes an offer to sell or buy securities, as 
> such an offer may only be made from a properly authorized offering document. 
> Although Nephila attempts to sweep e-mail and attachments for viruses, it 
> does not guarantee that either are virus-free and accepts no liability for 
> any damage sustained as a result of viruses. 
> ---

Re: SparkR query

2016-05-17 Thread Sun Rui
Lewis,
1. Could you check the values of “SPARK_HOME” environment on all of your worker 
nodes?
2. How did you start your SparkR shell?

> On May 17, 2016, at 18:07, Mike Lewis  wrote:
> 
> Hi,
>  
> I have a SparkR driver process that connects to a master running on Linux,
> I’ve tried to do a simple test, e.g.
>  
> sc <- sparkR.init(master="spark://my-linux-host.dev.local:7077 
> ",  
> sparkEnvir=list(spark.cores.max="4"))
> x <- SparkR:::parallelize(sc,1:100,2)
> y <- count(x)
>  
> But I can see that the worker nodes are failing, they are looking for the 
> Windows (rather than linux path) to
> Daemon.R
>  
> 16/05/17 06:52:33 INFO BufferedStreamThread: Fatal error: cannot open file 
> 'E:/Dev/spark/spark-1.4.0-bin-hadoop2.6/R/lib/SparkR/worker/daemon.R': No 
> such file or directory
>  
> Is this a configuration setting that I’m missing, the worker nodes (linux) 
> shouldn’t be looking in the spark home of the driver (windows) ?
> If so, I’d appreciate someone letting me know what I need to change/set.
> 
> Thanks,
> Mike Lewis
>  
> 
> --
>  
> This email has been sent to you on behalf of Nephila Advisors LLC 
> (“Advisors”). Advisors provides consultancy services to Nephila Capital Ltd. 
> (“Capital”), an investment advisor managed and carrying on business in 
> Bermuda. Advisors and its employees do not act as agents for Capital or the 
> funds it advises and do not have the authority to bind Capital or such funds 
> to any transaction or agreement. 
> 
> The information in this e-mail, and any attachment therein, is confidential 
> and for use by the addressee only. Any use, disclosure, reproduction, 
> modification or distribution of the contents of this e-mail, or any part 
> thereof, other than by the intended recipient, is strictly prohibited. If you 
> are not the intended recipient, please return the e-mail to the sender and 
> delete it from your computer. This email is for information purposes only, 
> nothing contained herein constitutes an offer to sell or buy securities, as 
> such an offer may only be made from a properly authorized offering document. 
> Although Nephila attempts to sweep e-mail and attachments for viruses, it 
> does not guarantee that either are virus-free and accepts no liability for 
> any damage sustained as a result of viruses. 
> --
>  
> 
> --
>  
> This email has been sent to you on behalf of Nephila Advisors UK (“Advisors 
> UK”). Advisors UK provides consultancy services to Nephila Capital Ltd. 
> (“Capital”), an investment advisor managed and carrying on business in 
> Bermuda. Advisors UK and its employees do not act as agents for Capital or 
> the funds it advises and do not have the authority to bind Capital or such 
> funds to any transaction or agreement. 
> 
> The information in this e-mail, and any attachment therein, is confidential 
> and for use by the addressee only. Any use, disclosure, reproduction, 
> modification or distribution of the contents of this e-mail, or any part 
> thereof, other than by the intended recipient, is strictly prohibited. If you 
> are not the intended recipient, please return the e-mail to the sender and 
> delete it from your computer. This email is for information purposes only, 
> nothing contained herein constitutes an offer to sell or buy securities, as 
> such an offer may only be made from a properly authorized offering document. 
> Although Nephila attempts to sweep e-mail and attachments for viruses, it 
> does not guarantee that either are virus-free and accepts no liability for 
> any damage sustained as a result of viruses. 
> --



Re: Spark 1.6.0: substring on df.select

2016-05-12 Thread Sun Rui
Alternatively, you may try the built-in function:
regexp_extract

> On May 12, 2016, at 20:27, Ewan Leith  wrote:
> 
> You could use a UDF pretty easily, something like this should work, the 
> lastElement function could be changed to do pretty much any string 
> manipulation you want.
>  
> import org.apache.spark.sql.functions.udf
>  
> def lastElement(input: String) = input.split("/").last
>  
> val lastElementUdf = udf(lastElement(_:String))
>  
> df.select(lastElementUdf ($"col1")).show()
>  
> Ewan
>  
>  
> From: Bharathi Raja [mailto:raja...@yahoo.com.INVALID] 
> Sent: 12 May 2016 11:40
> To: Raghavendra Pandey ; Bharathi Raja 
> 
> Cc: User 
> Subject: RE: Spark 1.6.0: substring on df.select
>  
> Thanks Raghav. 
>  
> I have 5+ million records. I feel creating multiple come is not an optimal 
> way.
>  
> Please suggest any other alternate solution.
> Can’t we do any string operation in DF.Select?
>  
> Regards,
> Raja
>  
> From: Raghavendra Pandey 
> Sent: 11 May 2016 09:04 PM
> To: Bharathi Raja 
> Cc: User 
> Subject: Re: Spark 1.6.0: substring on df.select
>  
> You can create a column with count of /.  Then take max of it and create that 
> many columns for every row with null fillers.
> 
> Raghav 
> 
> On 11 May 2016 20:37, "Bharathi Raja"  > wrote:
> Hi,
>  
> I have a dataframe column col1 with values something like 
> “/client/service/version/method”. The number of “/” are not constant.
> Could you please help me to extract all methods from the column col1?
>  
> In Pig i used SUBSTRING with LAST_INDEX_OF(“/”).
>  
> Thanks in advance.
> Regards,
> Raja



RE: How does spark-submit handle Python scripts (and how to repeat it)?

2016-04-13 Thread Sun, Rui
In SparkSubmit, there is less work for yarn-client than that for yarn-cluster. 
Basically prepare some spark configurations into system prop , for example, 
information on additional resources required by the application that need to be 
distributed to the cluster. These configurations will be used in SparkContext 
initialization later.

So generally for yarn-client, maybe you can skip spark-submit and directly 
launching the spark application with some configurations setup before new 
SparkContext.

Not sure about your error, have you setup YARN_CONF_DIR?

From: Andrei [mailto:faithlessfri...@gmail.com]
Sent: Thursday, April 14, 2016 5:45 AM
To: Sun, Rui <rui@intel.com>
Cc: user <user@spark.apache.org>
Subject: Re: How does spark-submit handle Python scripts (and how to repeat it)?

Julia can pick the env var, and set the system properties or directly fill the 
configurations into a SparkConf, and then create a SparkContext

That's the point - just setting master to "yarn-client" doesn't work, even in 
Java/Scala. E.g. following code in Scala:

val conf = new SparkConf().setAppName("My App").setMaster("yarn-client")
val sc = new SparkContext(conf)
sc.parallelize(1 to 10).collect()
sc.stop()

results in an error:

Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8032<http://0.0.0.0/0.0.0.0:8032>

I think for now we can even put Julia aside and concentrate the following 
question: how does submitting application via `spark-submit` with "yarn-client" 
mode differ from setting the same mode directly in `SparkConf`?



On Wed, Apr 13, 2016 at 5:06 AM, Sun, Rui 
<rui@intel.com<mailto:rui@intel.com>> wrote:
Spark configurations specified at the command line for spark-submit should be 
passed to the JVM inside Julia process. You can refer to 
https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L267
 and 
https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L295
Generally,
spark-submit JVM -> JuliaRunner -> Env var like 
“JULIA_SUBMIT_ARGS” -> julia process -> new JVM with SparkContext
  Julia can pick the env var, and set the system properties or directly fill 
the configurations into a SparkConf, and then create a SparkContext

Yes, you are right, `spark-submit` creates new Python/R process that connects 
back to that same JVM and creates SparkContext in it.
Refer to 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L47
 and
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/RRunner.scala#L65


From: Andrei 
[mailto:faithlessfri...@gmail.com<mailto:faithlessfri...@gmail.com>]
Sent: Wednesday, April 13, 2016 4:32 AM
To: Sun, Rui <rui@intel.com<mailto:rui@intel.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: How does spark-submit handle Python scripts (and how to repeat it)?

One part is passing the command line options, like “--master”, from the JVM 
launched by spark-submit to the JVM where SparkContext resides

Since I have full control over both - JVM and Julia parts - I can pass whatever 
options to both. But what exactly should be passed? Currently pipeline looks 
like this:

spark-submit JVM -> JuliaRunner -> julia process -> new JVM with SparkContext

 I want to make the last JVM's SparkContext to understand that it should run on 
YARN. Obviously, I can't pass `--master yarn` option to JVM itself. Instead, I 
can pass system property "spark.master" = "yarn-client", but this results in an 
error:

Retrying connect to server: 0.0.0.0/0.0.0.0:8032<http://0.0.0.0/0.0.0.0:8032>


So it's definitely not enough. I tried to set manually all system properties 
that `spark-submit` adds to the JVM (including "spark-submit=true", 
"spark.submit.deployMode=client", etc.), but it didn't help too. Source code is 
always good, but for a stranger like me it's a little bit hard to grasp control 
flow in SparkSubmit class.


For pySpark & SparkR, when running scripts in client deployment modes 
(standalone client and yarn client), the JVM is the same (py4j/RBackend running 
as a thread in the JVM launched by spark-submit)

Can you elaborate on this? Does it mean that `spark-submit` creates new 
Python/R process that connects back to that same JVM and creates SparkContext 
in it?


On Tue, Apr 12, 2016 at 2:04 PM, Sun, Rui 
<rui@intel.com<mailto:rui@intel.com>> wrote:
There is much deployment preparation work handling different deployment modes 
for pyspark and SparkR in SparkSubmit. It is difficult to summarize it briefly, 
you had better refer to the source code.

Supporting running Julia scripts in SparkSubmit is more than i

RE: How does spark-submit handle Python scripts (and how to repeat it)?

2016-04-12 Thread Sun, Rui
Spark configurations specified at the command line for spark-submit should be 
passed to the JVM inside Julia process. You can refer to 
https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L267
 and 
https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L295
Generally,
spark-submit JVM -> JuliaRunner -> Env var like 
“JULIA_SUBMIT_ARGS” -> julia process -> new JVM with SparkContext
  Julia can pick the env var, and set the system properties or directly fill 
the configurations into a SparkConf, and then create a SparkContext

Yes, you are right, `spark-submit` creates new Python/R process that connects 
back to that same JVM and creates SparkContext in it.
Refer to 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L47
 and
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/RRunner.scala#L65


From: Andrei [mailto:faithlessfri...@gmail.com]
Sent: Wednesday, April 13, 2016 4:32 AM
To: Sun, Rui <rui@intel.com>
Cc: user <user@spark.apache.org>
Subject: Re: How does spark-submit handle Python scripts (and how to repeat it)?

One part is passing the command line options, like “--master”, from the JVM 
launched by spark-submit to the JVM where SparkContext resides

Since I have full control over both - JVM and Julia parts - I can pass whatever 
options to both. But what exactly should be passed? Currently pipeline looks 
like this:

spark-submit JVM -> JuliaRunner -> julia process -> new JVM with SparkContext

 I want to make the last JVM's SparkContext to understand that it should run on 
YARN. Obviously, I can't pass `--master yarn` option to JVM itself. Instead, I 
can pass system property "spark.master" = "yarn-client", but this results in an 
error:

Retrying connect to server: 0.0.0.0/0.0.0.0:8032<http://0.0.0.0/0.0.0.0:8032>


So it's definitely not enough. I tried to set manually all system properties 
that `spark-submit` adds to the JVM (including "spark-submit=true", 
"spark.submit.deployMode=client", etc.), but it didn't help too. Source code is 
always good, but for a stranger like me it's a little bit hard to grasp control 
flow in SparkSubmit class.


For pySpark & SparkR, when running scripts in client deployment modes 
(standalone client and yarn client), the JVM is the same (py4j/RBackend running 
as a thread in the JVM launched by spark-submit)

Can you elaborate on this? Does it mean that `spark-submit` creates new 
Python/R process that connects back to that same JVM and creates SparkContext 
in it?


On Tue, Apr 12, 2016 at 2:04 PM, Sun, Rui 
<rui@intel.com<mailto:rui@intel.com>> wrote:
There is much deployment preparation work handling different deployment modes 
for pyspark and SparkR in SparkSubmit. It is difficult to summarize it briefly, 
you had better refer to the source code.

Supporting running Julia scripts in SparkSubmit is more than implementing a 
‘JuliaRunner’. One part is passing the command line options, like “--master”, 
from the JVM launched by spark-submit to the JVM where SparkContext resides, in 
the case that the two JVMs are not the same. For pySpark & SparkR, when running 
scripts in client deployment modes (standalone client and yarn client), the JVM 
is the same (py4j/RBackend running as a thread in the JVM launched by 
spark-submit) , so no need to pass the command line options around. However, in 
your case, Julia interpreter launches an in-process JVM for SparkContext, which 
is a separate JVM from the one launched by spark-submit. So you need a way, 
typically an environment environment variable, like “SPARKR_SUBMIT_ARGS” for 
SparkR or “PYSPARK_SUBMIT_ARGS” for pyspark, to pass command line args to the 
in-process JVM in the Julia interpreter so that SparkConf can pick the options.

From: Andrei 
[mailto:faithlessfri...@gmail.com<mailto:faithlessfri...@gmail.com>]
Sent: Tuesday, April 12, 2016 3:48 AM
To: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: How does spark-submit handle Python scripts (and how to repeat it)?

I'm working on a wrapper [1] around Spark for the Julia programming language 
[2] similar to PySpark. I've got it working with Spark Standalone server by 
creating local JVM and setting master programmatically. However, this approach 
doesn't work with YARN (and probably Mesos), which require running via 
`spark-submit`.

In `SparkSubmit` class I see that for Python a special class `PythonRunner` is 
launched, so I tried to do similar `JuliaRunner`, which essentially does the 
following:

val pb = new ProcessBuilder(Seq("julia", juliaScript))
val process = pb.start()
process.waitFor()

where `juliaScript` itself creates new JV

RE: Can i have a hive context and sql context in the same app ?

2016-04-12 Thread Sun, Rui
  val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
defaultValue = Some(true),
doc = "When set to true, creating multiple SQLContexts/HiveContexts is 
allowed." +
  "When set to false, only one SQLContext/HiveContext is allowed to be 
created " +
  "through the constructor (new SQLContexts/HiveContexts created through 
newSession " +
  "method is allowed). Please note that this conf needs to be set in Spark 
Conf. Once" +
  "a SQLContext/HiveContext has been created, changing the value of this 
conf will not" +
  "have effect.",
isPublic = true)

I don’t think there is any performance pernalties of doing so.
From: Natu Lauchande [mailto:nlaucha...@gmail.com]
Sent: Tuesday, April 12, 2016 4:49 PM
To: user@spark.apache.org
Subject: Can i have a hive context and sql context in the same app ?

Hi,
Is it possible to have both a sqlContext and a hiveContext in the same 
application ?
If yes would there be any performance pernalties of doing so.

Regards,
Natu


RE: Run a self-contained Spark app on a Spark standalone cluster

2016-04-12 Thread Sun, Rui
Which py file is your main file (primary py file)? Zip the other two py files. 
Leave the main py file alone. Don't copy them to S3 because it seems that only 
local primary and additional py files are supported.

./bin/spark-submit --master spark://... --py-files  

-Original Message-
From: kevllino [mailto:kevin.e...@mail.dcu.ie] 
Sent: Tuesday, April 12, 2016 5:07 PM
To: user@spark.apache.org
Subject: Run a self-contained Spark app on a Spark standalone cluster

Hi, 

I need to know how to run a self-contained Spark app  (3 python files) in a 
Spark standalone cluster. Can I move the .py files to the cluster, or should I 
store them locally, on HDFS or S3? I tried the following locally and on S3 with 
a zip of my .py files as suggested  here 
  : 

./bin/spark-submit --master
spark://ec2-54-51-23-172.eu-west-1.compute.amazonaws.com:5080--py-files
s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@mubucket//weather_predict.zip

But get: “Error: Must specify a primary resource (JAR or Python file)”

Best,
Kevin 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Run-a-self-contained-Spark-app-on-a-Spark-standalone-cluster-tp26753.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How does spark-submit handle Python scripts (and how to repeat it)?

2016-04-12 Thread Sun, Rui
There is much deployment preparation work handling different deployment modes 
for pyspark and SparkR in SparkSubmit. It is difficult to summarize it briefly, 
you had better refer to the source code.

Supporting running Julia scripts in SparkSubmit is more than implementing a 
‘JuliaRunner’. One part is passing the command line options, like “--master”, 
from the JVM launched by spark-submit to the JVM where SparkContext resides, in 
the case that the two JVMs are not the same. For pySpark & SparkR, when running 
scripts in client deployment modes (standalone client and yarn client), the JVM 
is the same (py4j/RBackend running as a thread in the JVM launched by 
spark-submit) , so no need to pass the command line options around. However, in 
your case, Julia interpreter launches an in-process JVM for SparkContext, which 
is a separate JVM from the one launched by spark-submit. So you need a way, 
typically an environment environment variable, like “SPARKR_SUBMIT_ARGS” for 
SparkR or “PYSPARK_SUBMIT_ARGS” for pyspark, to pass command line args to the 
in-process JVM in the Julia interpreter so that SparkConf can pick the options.

From: Andrei [mailto:faithlessfri...@gmail.com]
Sent: Tuesday, April 12, 2016 3:48 AM
To: user 
Subject: How does spark-submit handle Python scripts (and how to repeat it)?

I'm working on a wrapper [1] around Spark for the Julia programming language 
[2] similar to PySpark. I've got it working with Spark Standalone server by 
creating local JVM and setting master programmatically. However, this approach 
doesn't work with YARN (and probably Mesos), which require running via 
`spark-submit`.

In `SparkSubmit` class I see that for Python a special class `PythonRunner` is 
launched, so I tried to do similar `JuliaRunner`, which essentially does the 
following:

val pb = new ProcessBuilder(Seq("julia", juliaScript))
val process = pb.start()
process.waitFor()

where `juliaScript` itself creates new JVM and `SparkContext` inside it WITHOUT 
setting master URL. I then tried to launch this class using

spark-submit --master yarn \
  --class o.a.s.a.j.JuliaRunner \
  project.jar my_script.jl

I expected that `spark-submit` would set environment variables or something 
that SparkContext would then read and connect to appropriate master. This 
didn't happen, however, and process failed while trying to instantiate 
`SparkContext`, saying that master is not specified.

So what am I missing? How can use `spark-submit` to run driver in a non-JVM 
language?


[1]: https://github.com/dfdx/Sparta.jl
[2]: http://julialang.org/


RE: How to process one partition at a time?

2016-04-06 Thread Sun, Rui
Maybe you can try SparkContext.submitJob:
def submitJob[T, U, R](rdd: 
RDD[T],
 processPartition: (Iterator[T]) ⇒ U, partitions: Seq[Int], resultHandler: 
(Int, U) ⇒ Unit, resultFunc: ⇒ R): 
SimpleFutureAction[R]


From: Hemant Bhanawat [mailto:hemant9...@gmail.com]
Sent: Wednesday, April 6, 2016 7:16 PM
To: Andrei 
Cc: user 
Subject: Re: How to process one partition at a time?

Instead of doing it in compute, you could rather override getPartitions method 
of your RDD and return only the target partitions. This way tasks for only 
target partitions will be created. Currently in your case, tasks for all the 
partitions are getting created.
I hope it helps. I would like to hear if you take some other approach.

Hemant Bhanawat
www.snappydata.io

On Wed, Apr 6, 2016 at 3:49 PM, Andrei 
> wrote:
I'm writing a kind of sampler which in most cases will require only 1 
partition, sometimes 2 and very rarely more. So it doesn't make sense to 
process all partitions in parallel. What is the easiest way to limit 
computations to one partition only?

So far the best idea I came to is to create a custom partition whose `compute` 
method looks something like:

def compute(split: Partition, context: TaskContext) = {
if (split.index == targetPartition) {
// do computation
} else {
   // return empty iterator
}
}


But it's quite ugly and I'm unlikely to be the first person with such a need. 
Is there easier way to do it?





RE: What's the benifit of RDD checkpoint against RDD save

2016-03-24 Thread Sun, Rui
As Mark said, checkpoint() can be called before calling any action on the RDD.

The choice between checkpoint and saveXXX depends. If you just want to cut the 
long RDD lineage, and the data won’t be re-used later, then use checkpoint, 
because it is simple and the checkpoint data will be cleaned automatically. 
Note that reliable checkpoint has a little performance penalty as it will 
re-compute the RDD. So you can either call RDD.cache before checkpoint or you 
can choose localCheckpoint.

If you want to reuse the data in another application, use SaveXXX(), because 
you can re-create an RDD from the saved data. On the contrary, you have no way 
to create an RDD from checkpoint data (maybe possible in Spark Streaming, but 
not sure).

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Friday, March 25, 2016 5:34 AM
To: Mark Hamstra 
Cc: Todd ; user@spark.apache.org
Subject: Re: What's the benifit of RDD checkpoint against RDD save

Thanks, Mark.

Since checkpoint may get cleaned up later on, it seems option #2 (saveXXX) is 
viable.

On Wed, Mar 23, 2016 at 8:01 PM, Mark Hamstra 
> wrote:
Yes, the terminology is being used sloppily/non-standardly in this thread -- 
"the last RDD" after a series of transformation is the RDD at the beginning of 
the chain, just now with an attached chain of "to be done" transformations when 
an action is eventually run.  If the saveXXX action is the only action being 
performed on the RDD, the rest of the chain being purely transformations, then 
checkpointing instead of saving still wouldn't execute any action on the RDD -- 
it would just mark the point at which checkpointing should be done when an 
action is eventually run.

On Wed, Mar 23, 2016 at 7:38 PM, Ted Yu 
> wrote:
bq. when I get the last RDD
If I read Todd's first email correctly, the computation has been done.
I could be wrong.

On Wed, Mar 23, 2016 at 7:34 PM, Mark Hamstra 
> wrote:
Neither of you is making any sense to me.  If you just have an RDD for which 
you have specified a series of transformations but you haven't run any actions, 
then neither checkpointing nor saving makes sense -- you haven't computed 
anything yet, you've only written out the recipe for how the computation should 
be done when it is needed.  Neither does the "called before any job" comment 
pose any restriction in this case since no jobs have yet been executed on the 
RDD.

On Wed, Mar 23, 2016 at 7:18 PM, Ted Yu 
> wrote:
See the doc for checkpoint:

   * Mark this RDD for checkpointing. It will be saved to a file inside the 
checkpoint
   * directory set with `SparkContext#setCheckpointDir` and all references to 
its parent
   * RDDs will be removed. This function must be called before any job has been
   * executed on this RDD. It is strongly recommended that this RDD is 
persisted in
   * memory, otherwise saving it on a file will require recomputation.

From the above description, you should not call it at the end of 
transformations.

Cheers

On Wed, Mar 23, 2016 at 7:14 PM, Todd > 
wrote:
Hi,

I have a long computing chain, when I get the last RDD after a series of 
transformation. I have two choices to do with this last RDD

1. Call checkpoint on RDD to materialize it to disk
2. Call RDD.saveXXX to save it to HDFS, and read it back for further processing

I would ask which choice is better? It looks to me that is not much difference 
between the two choices.
Thanks!








RE: Run External R script from Spark

2016-03-21 Thread Sun, Rui
It’s a possible approach. It actually leverages Spark’s parallel execution.  
PipeRDD’s  launching of external processes is just like that in pySpark and 
SparkR for RDD API.

The concern is pipeRDD relies on text based serialization/deserialization. 
Whether the performance is acceptable actually depends on your workload and 
cluster configurations. You can do some profiling to evaluate it.

From: sujeet jog [mailto:sujeet@gmail.com]
Sent: Monday, March 21, 2016 2:10 PM
To: user@spark.apache.org
Subject: Run External R script from Spark

Hi,

I have been working on a POC on some time series related stuff, i'm using 
python since i need spark streaming and sparkR is yet to have a spark streaming 
front end,  couple of algorithms i want to use are not yet present in Spark-TS 
package, so I'm thinking of invoking a external R script for the Algorithm part 
& pass the data from Spark to the R script via pipeRdd,


What i wanted to understand is can something like this be used in a production 
deployment,  since passing the data via R script would mean lot of serializing 
and would actually not use the power of spark for parallel execution,

Has anyone used this kind of workaround  Spark -> pipeRdd-> R script.


Thanks,
Sujeet


RE: Error in "java.io.IOException: No input paths specified in job"

2016-03-19 Thread Sun, Rui
It complains about the file path  "./examples/src/main/resources/people.json"
You can try to use absolute path instead of relative path, and make sure the 
absolute path is correct.
If that still does not work, you can prefix the path with "file://" in case the 
default file schema for Hadoop is HDFS.

-Original Message-
From: tinyocean [mailto:haiyiz...@gmail.com] 
Sent: Thursday, March 17, 2016 9:22 PM
To: user@spark.apache.org
Subject: Error in "java.io.IOException: No input paths specified in job"

Hello,

I am learning sparkR by myself and have little computer background.

I am following the examples on
http://spark.apache.org/docs/latest/sparkr.html
and running 

/sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

people <- read.df(sqlContext, "./examples/src/main/resources/people.json",
"json")
head(people)/

But got
/Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
  java.io.IOException: No input paths specified in job
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfu/

Could you please tell me where I got it wrong and how to fix it?
Thanks.

Regards,
Amy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-java-io-IOException-No-input-paths-specified-in-job-tp26528.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: sparkR issues ?

2016-03-18 Thread Sun, Rui
Sorry. I am wrong. The issue is not related to as.data.frame(). It seems to be 
related to DataFrame naming conflict between s4vectors and SparkR.
Refer to https://issues.apache.org/jira/browse/SPARK-12148


From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, March 16, 2016 9:33 AM
To: Alex Kozlov <ale...@gmail.com>; roni <roni.epi...@gmail.com>
Cc: user@spark.apache.org
Subject: RE: sparkR issues ?

I have submitted https://issues.apache.org/jira/browse/SPARK-13905 and a PR for 
it.

From: Alex Kozlov [mailto:ale...@gmail.com]
Sent: Wednesday, March 16, 2016 12:52 AM
To: roni <roni.epi...@gmail.com<mailto:roni.epi...@gmail.com>>
Cc: Sun, Rui <rui@intel.com<mailto:rui@intel.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: sparkR issues ?

Hi Roni, you can probably rename the as.data.frame in 
$SPARK_HOME/R/pkg/R/DataFrame.R and re-install SparkR by running install-dev.sh

On Tue, Mar 15, 2016 at 8:46 AM, roni 
<roni.epi...@gmail.com<mailto:roni.epi...@gmail.com>> wrote:
Hi ,
 Is there a work around for this?
 Do i need to file a bug for this?
Thanks
-R

On Tue, Mar 15, 2016 at 12:28 AM, Sun, Rui 
<rui@intel.com<mailto:rui@intel.com>> wrote:
It seems as.data.frame() defined in SparkR convers the versions in R base 
package.
We can try to see if we can change the implementation of as.data.frame() in 
SparkR to avoid such covering.

From: Alex Kozlov [mailto:ale...@gmail.com<mailto:ale...@gmail.com>]
Sent: Tuesday, March 15, 2016 2:59 PM
To: roni <roni.epi...@gmail.com<mailto:roni.epi...@gmail.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: sparkR issues ?

This seems to be a very unfortunate name collision.  SparkR defines it's own 
DataFrame class which shadows what seems to be your own definition.

Is DataFrame something you define?  Can you rename it?

On Mon, Mar 14, 2016 at 10:44 PM, roni 
<roni.epi...@gmail.com<mailto:roni.epi...@gmail.com>> wrote:
Hi,
 I am working with bioinformatics and trying to convert some scripts to sparkR 
to fit into other spark jobs.

I tries a simple example from a bioinf lib and as soon as I start sparkR 
environment it does not work.

code as follows -
countData <- matrix(1:100,ncol=4)
condition <- factor(c("A","A","B","B"))
dds <- DESeqDataSetFromMatrix(countData, DataFrame(condition), ~ condition)

Works if i dont initialize the sparkR environment.
 if I do library(SparkR) and sqlContext <- sparkRSQL.init(sc)  it gives 
following error

> dds <- DESeqDataSetFromMatrix(countData, as.data.frame(condition), ~ 
> condition)
Error in DataFrame(colData, row.names = rownames(colData)) :
  cannot coerce class "data.frame" to a DataFrame

I am really stumped. I am not using any spark function , so i would expect it 
to work as a simple R code.
why it does not work?

Appreciate  the help
-R




--
Alex Kozlov
(408) 507-4987<tel:%28408%29%20507-4987>
(650) 887-2135<tel:%28650%29%20887-2135> efax
ale...@gmail.com<mailto:ale...@gmail.com>




--
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com<mailto:ale...@gmail.com>


RE: sparkR issues ?

2016-03-15 Thread Sun, Rui
I have submitted https://issues.apache.org/jira/browse/SPARK-13905 and a PR for 
it.

From: Alex Kozlov [mailto:ale...@gmail.com]
Sent: Wednesday, March 16, 2016 12:52 AM
To: roni <roni.epi...@gmail.com>
Cc: Sun, Rui <rui@intel.com>; user@spark.apache.org
Subject: Re: sparkR issues ?

Hi Roni, you can probably rename the as.data.frame in 
$SPARK_HOME/R/pkg/R/DataFrame.R and re-install SparkR by running install-dev.sh

On Tue, Mar 15, 2016 at 8:46 AM, roni 
<roni.epi...@gmail.com<mailto:roni.epi...@gmail.com>> wrote:
Hi ,
 Is there a work around for this?
 Do i need to file a bug for this?
Thanks
-R

On Tue, Mar 15, 2016 at 12:28 AM, Sun, Rui 
<rui@intel.com<mailto:rui@intel.com>> wrote:
It seems as.data.frame() defined in SparkR convers the versions in R base 
package.
We can try to see if we can change the implementation of as.data.frame() in 
SparkR to avoid such covering.

From: Alex Kozlov [mailto:ale...@gmail.com<mailto:ale...@gmail.com>]
Sent: Tuesday, March 15, 2016 2:59 PM
To: roni <roni.epi...@gmail.com<mailto:roni.epi...@gmail.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: sparkR issues ?

This seems to be a very unfortunate name collision.  SparkR defines it's own 
DataFrame class which shadows what seems to be your own definition.

Is DataFrame something you define?  Can you rename it?

On Mon, Mar 14, 2016 at 10:44 PM, roni 
<roni.epi...@gmail.com<mailto:roni.epi...@gmail.com>> wrote:
Hi,
 I am working with bioinformatics and trying to convert some scripts to sparkR 
to fit into other spark jobs.

I tries a simple example from a bioinf lib and as soon as I start sparkR 
environment it does not work.

code as follows -
countData <- matrix(1:100,ncol=4)
condition <- factor(c("A","A","B","B"))
dds <- DESeqDataSetFromMatrix(countData, DataFrame(condition), ~ condition)

Works if i dont initialize the sparkR environment.
 if I do library(SparkR) and sqlContext <- sparkRSQL.init(sc)  it gives 
following error

> dds <- DESeqDataSetFromMatrix(countData, as.data.frame(condition), ~ 
> condition)
Error in DataFrame(colData, row.names = rownames(colData)) :
  cannot coerce class "data.frame" to a DataFrame

I am really stumped. I am not using any spark function , so i would expect it 
to work as a simple R code.
why it does not work?

Appreciate  the help
-R




--
Alex Kozlov
(408) 507-4987<tel:%28408%29%20507-4987>
(650) 887-2135<tel:%28650%29%20887-2135> efax
ale...@gmail.com<mailto:ale...@gmail.com>




--
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com<mailto:ale...@gmail.com>


RE: sparkR issues ?

2016-03-15 Thread Sun, Rui
It seems as.data.frame() defined in SparkR convers the versions in R base 
package.
We can try to see if we can change the implementation of as.data.frame() in 
SparkR to avoid such covering.

From: Alex Kozlov [mailto:ale...@gmail.com]
Sent: Tuesday, March 15, 2016 2:59 PM
To: roni 
Cc: user@spark.apache.org
Subject: Re: sparkR issues ?

This seems to be a very unfortunate name collision.  SparkR defines it's own 
DataFrame class which shadows what seems to be your own definition.

Is DataFrame something you define?  Can you rename it?

On Mon, Mar 14, 2016 at 10:44 PM, roni 
> wrote:
Hi,
 I am working with bioinformatics and trying to convert some scripts to sparkR 
to fit into other spark jobs.

I tries a simple example from a bioinf lib and as soon as I start sparkR 
environment it does not work.

code as follows -
countData <- matrix(1:100,ncol=4)
condition <- factor(c("A","A","B","B"))
dds <- DESeqDataSetFromMatrix(countData, DataFrame(condition), ~ condition)

Works if i dont initialize the sparkR environment.
 if I do library(SparkR) and sqlContext <- sparkRSQL.init(sc)  it gives 
following error

> dds <- DESeqDataSetFromMatrix(countData, as.data.frame(condition), ~ 
> condition)
Error in DataFrame(colData, row.names = rownames(colData)) :
  cannot coerce class "data.frame" to a DataFrame

I am really stumped. I am not using any spark function , so i would expect it 
to work as a simple R code.
why it does not work?

Appreciate  the help
-R




--
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com


RE: lint-r checks failing

2016-03-10 Thread Sun, Rui
This is probably because the installed lintr package get updated. After update, 
lintr can detect errors that are skipped before

I will submit a PR for this issue

-Original Message-
From: Gayathri Murali [mailto:gayathri.m.sof...@gmail.com] 
Sent: Friday, March 11, 2016 12:48 PM
To: user@spark.apache.org
Subject: lint-r checks failing

Hi All,

I recently tried to run ./dev/run-tests on a freshly clones spark repository 
and I get lint-r check failed error. I have run these tests multiple times 
before and never had this issue. I have copied part of the issue here. Please 
note that I haven’t modified any of these files. Am I missing some setup for 
lint-r?

/sparkR.R:156:25: style: Put spaces around all infix operators.
f <- file(path, open="rb")
   ~^~
R/sparkR.R:188:5: style: Place a space before left parenthesis, except in a 
function call.
  if(is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
^
R/sparkR.R:190:34: style: Commas should always have a space after.
  paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
 ^
R/utils.R:161:32: style: Commas should always have a space after.
  vec <- c(bitwShiftL(val, c(4,3,2,1,0)), addVal)
   ^
R/utils.R:161:34: style: Commas should always have a space after.
  vec <- c(bitwShiftL(val, c(4,3,2,1,0)), addVal)
 ^
R/utils.R:161:36: style: Commas should always have a space after.
  vec <- c(bitwShiftL(val, c(4,3,2,1,0)), addVal)
   ^
R/utils.R:161:38: style: Commas should always have a space after.
  vec <- c(bitwShiftL(val, c(4,3,2,1,0)), addVal)
 ^
R/utils.R:205:5: style: Place a space before left parenthesis, except in a 
function call.
  if(acc$counter == acc$size) {
^
lintr checks failed.
[error] running /Users/gayathri/spark/dev/lint-r ; received return code 1

Thanks
Gayathri



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: SparkR Count vs Take performance

2016-03-02 Thread Sun, Rui
This is nothing to do with object serialization/deserialization. It is expected 
behavior that take(1) most likely runs slower than count() on an empty RDD.

This is all about the algorithm with which take() is implemented. Take()  
1. Reads one partition to get the elements
2. If the fetched elements do not satisfy the limit, it will estimate the 
number of additional partitions and fetch elements in them.
Take() repeats the step 2 until it get the desired number of elements or it 
will go through all partitions.

So take(1) on an empty RDD will go through all partitions in a sequential way.

Comparing with take(), Count() also computes all partition, but the computation 
is parallel on all partitions at once.

Take() implementation in SparkR is less optimized than that in Scala as SparkR 
won't estimate the number of additional partitions but will read just one 
partition in each fetch.

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, March 2, 2016 3:37 AM
To: Dirceu Semighini Filho 
Cc: user 
Subject: Re: SparkR Count vs Take performance

Yeah one surprising result is that you can't call isEmpty on an RDD of 
nonserializable objects. You can't do much with an RDD of nonserializable 
objects anyway, but they can exist as an intermediate stage.

We could fix that pretty easily with a little copy and paste of the
take() code; right now isEmpty is simple but has this drawback.

On Tue, Mar 1, 2016 at 7:18 PM, Dirceu Semighini Filho 
 wrote:
> Great, I didn't noticed this isEmpty method.
> Well serialization is been a problem in this project, we have noticed 
> a lot of time been spent in serializing and deserializing things to 
> send and get from the cluster.
>
> 2016-03-01 15:47 GMT-03:00 Sean Owen :
>>
>> There is an "isEmpty" method that basically does exactly what your 
>> second version does.
>>
>> I have seen it be unusually slow at times because it must copy 1 
>> element to the driver, and it's possible that's slow. It still 
>> shouldn't be slow in general, and I'd be surprised if it's slower 
>> than a count in all but pathological cases.
>>
>>
>>
>> On Tue, Mar 1, 2016 at 6:03 PM, Dirceu Semighini Filho 
>>  wrote:
>> > Hello all.
>> > I have a script that create a dataframe from this operation:
>> >
>> > mytable <- sql(sqlContext,("SELECT ID_PRODUCT, ... FROM mytable"))
>> >
>> > rSparkDf <- createPartitionedDataFrame(sqlContext,myRdataframe)
>> > dFrame <- 
>> > join(mytable,rSparkDf,mytable$ID_PRODUCT==rSparkDf$ID_PRODUCT)
>> >
>> > After filtering this dFrame with this:
>> >
>> >
>> > I tried to execute the following
>> > filteredDF <- filterRDD(toRDD(dFrame),function (row) {row['COLUMN'] 
>> > %in% c("VALUES", ...)}) Now I need to know if the resulting 
>> > dataframe is empty, and to do that I tried this two codes:
>> > if(count(filteredDF) > 0)
>> > and
>> > if(length(take(filteredDF,1)) > 0)
>> > I thought that the second one, using take, shoule run faster than 
>> > count, but that didn't happen.
>> > The take operation creates one job per partition of my rdd (which 
>> > was
>> > 200)
>> > and this make it to run slower than the count.
>> > Is this the expected behaviour?
>> >
>> > Regards,
>> > Dirceu
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: Apache Arrow + Spark examples?

2016-02-24 Thread Sun, Rui
Spark has not supported Arrow yet. There is a JIRA 
https://issues.apache.org/jira/browse/SPARK-13391 requesting working on it.

From: Robert Towne [mailto:robert.to...@webtrends.com]
Sent: Wednesday, February 24, 2016 5:21 AM
To: user@spark.apache.org
Subject: Apache Arrow + Spark examples?

I have been reading some of the news this week about Apache Arrow as a new top 
level project.  It appears to be a common data layer between Spark and other 
systems (Cassandra, Drill, Impala, etc).

Has anyone seen any sample Spark code that integrates with Arrow?

Thanks,
Robert


RE: Running synchronized JRI code

2016-02-15 Thread Sun, Rui
On computation, RRDD launches one R process for each partition, so there won't 
be thread-safe issue

Could you give more details on your new environment?

-Original Message-
From: Simon Hafner [mailto:reactorm...@gmail.com] 
Sent: Monday, February 15, 2016 7:31 PM
To: Sun, Rui <rui@intel.com>
Cc: user <user@spark.apache.org>
Subject: Re: Running synchronized JRI code

2016-02-15 4:35 GMT+01:00 Sun, Rui <rui@intel.com>:
> Yes, JRI loads an R dynamic library into the executor JVM, which faces 
> thread-safe issue when there are multiple task threads within the executor.
>
> I am thinking if the demand like yours (calling R code in RDD 
> transformations) is much desired, we may consider refactoring RRDD for this 
> purpose, although it is currently intended for internal use by SparkR and not 
> a public API.
So the RRDDs don't have that thread safety issue? I'm currently creating a new 
environment for each call, but it still crashes.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Running synchronized JRI code

2016-02-14 Thread Sun, Rui
For YARN mode, you can set --executor-cores 1

-Original Message-
From: Sun, Rui [mailto:rui@intel.com] 
Sent: Monday, February 15, 2016 11:35 AM
To: Simon Hafner <reactorm...@gmail.com>; user <user@spark.apache.org>
Subject: RE: Running synchronized JRI code

Yes, JRI loads an R dynamic library into the executor JVM, which faces 
thread-safe issue when there are multiple task threads within the executor.

If you are running Spark on Standalone mode, it is possible to run multiple 
workers per node, and at the same time, limit the cores per worker to be 1. 

You could use RDD.pipe(), but you may need handle binary-text conversion as the 
input/output to/from the R process is string-based.

I am thinking if the demand like yours (calling R code in RDD transformations) 
is much desired, we may consider refactoring RRDD for this purpose, although it 
is currently intended for internal use by SparkR and not a public API. 

-Original Message-
From: Simon Hafner [mailto:reactorm...@gmail.com] 
Sent: Monday, February 15, 2016 5:09 AM
To: user <user@spark.apache.org>
Subject: Running synchronized JRI code

Hello

I'm currently running R code in an executor via JRI. Because R is 
single-threaded, any call to R needs to be wrapped in a `synchronized`. Now I 
can use a bit more than one core per executor, which is undesirable. Is there a 
way to tell spark that this specific application (or even specific UDF) needs 
multiple JVMs? Or should I switch from JRI to a pipe-based (slower) setup?

Cheers,
Simon

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Running synchronized JRI code

2016-02-14 Thread Sun, Rui
Yes, JRI loads an R dynamic library into the executor JVM, which faces 
thread-safe issue when there are multiple task threads within the executor.

If you are running Spark on Standalone mode, it is possible to run multiple 
workers per node, and at the same time, limit the cores per worker to be 1. 

You could use RDD.pipe(), but you may need handle binary-text conversion as the 
input/output to/from the R process is string-based.

I am thinking if the demand like yours (calling R code in RDD transformations) 
is much desired, we may consider refactoring RRDD for this purpose, although it 
is currently intended for internal use by SparkR and not a public API. 

-Original Message-
From: Simon Hafner [mailto:reactorm...@gmail.com] 
Sent: Monday, February 15, 2016 5:09 AM
To: user 
Subject: Running synchronized JRI code

Hello

I'm currently running R code in an executor via JRI. Because R is 
single-threaded, any call to R needs to be wrapped in a `synchronized`. Now I 
can use a bit more than one core per executor, which is undesirable. Is there a 
way to tell spark that this specific application (or even specific UDF) needs 
multiple JVMs? Or should I switch from JRI to a pipe-based (slower) setup?

Cheers,
Simon

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: different behavior while using createDataFrame and read.df in SparkR

2016-02-08 Thread Sun, Rui
I guess the problem is:


dummy.df<-withColumn(dataframe,paste0(colnames(cat.column),j),ifelse(column[[1]]==levels(as.factor(unlist(cat.column)))[j],1,0)
 )


dataframe<-dummy.df

Once dataframe is re-assigned to reference a new DataFrame in each iteration, 
the column variable has to be re-assigned to reference a column in the new 
DataFrame.

From: Devesh Raj Singh [mailto:raj.deves...@gmail.com]
Sent: Saturday, February 6, 2016 8:31 PM
To: Sun, Rui <rui@intel.com>
Cc: user@spark.apache.org
Subject: Re: different behavior while using createDataFrame and read.df in 
SparkR

Thank you ! Rui Sun for the observation! It helped.

I have a new problem arising. When I create a small function for dummy variable 
creation for categorical column

BDADummies<-function(dataframe,column){
  cat.column<-vector(mode="character",length=nrow(dataframe))
  cat.column<-collect(column)
  lev<-length(levels(as.factor(unlist(cat.column
  for (j in 1:lev){



dummy.df<-withColumn(dataframe,paste0(colnames(cat.column),j),ifelse(column[[1]]==levels(as.factor(unlist(cat.column)))[j],1,0)
 )


dataframe<-dummy.df
}
  return(dataframe)
}

and when I call the function using

newdummy.df<-BDADummies(df1,column=select(df1,df1$Species))


I get the below error

Error in withColumn(dataframe, paste0(colnames(cat.column), j), 
ifelse(column[[1]] ==  :
  error in evaluating the argument 'col' in selecting a method for function 
'withColumn': Error in if (le > 0) paste0("[1:", paste(le), "]") else "(0)" :
  argument is not interpretable as logical


but when i use it without calling or creating a function , the statement

dummy.df<-withColumn(dataframe,paste0(colnames(cat.column),j),ifelse(column[[1]]==levels(as.factor(unlist(cat.column)))[j],1,0)
 )

gives me the new columns generating column names as desired.

Warm regards,
Devesh.

On Sat, Feb 6, 2016 at 7:09 AM, Sun, Rui 
<rui@intel.com<mailto:rui@intel.com>> wrote:
I guess this is related to https://issues.apache.org/jira/browse/SPARK-11976

When calling createDataFrame on iris, the “.” Character in column names will be 
replaced with “_”.
It seems that when you create a DataFrame from the CSV file, the “.” Character 
in column names are still there.

From: Devesh Raj Singh 
[mailto:raj.deves...@gmail.com<mailto:raj.deves...@gmail.com>]
Sent: Friday, February 5, 2016 2:44 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Cc: Sun, Rui
Subject: different behavior while using createDataFrame and read.df in SparkR


Hi,

I am using Spark 1.5.1

When I do this

df <- createDataFrame(sqlContext, iris)

#creating a new column for category "Setosa"

df$Species1<-ifelse((df)[[5]]=="setosa",1,0)

head(df)

output: new column created

  Sepal.Length Sepal.Width Petal.Length Petal.Width Species
1  5.1 3.5  1.4 0.2  setosa
2  4.9 3.0  1.4 0.2  setosa
3  4.7 3.2  1.3 0.2  setosa
4  4.6 3.1  1.5 0.2  setosa
5  5.0 3.6  1.4 0.2  setosa
6  5.4 3.9  1.7 0.4  setosa

but when I saved the iris dataset as a CSV file and try to read it and convert 
it to sparkR dataframe

df <- read.df(sqlContext,"/Users/devesh/Github/deveshgit2/bdaml/data/iris/",
  source = "com.databricks.spark.csv",header = "true",inferSchema = 
"true")

now when I try to create new column

df$Species1<-ifelse((df)[[5]]=="setosa",1,0)
I get the below error:

16/02/05 12:11:01 ERROR RBackendHandler: col on 922 failed
Error in select(x, x$"*", alias(col, colName)) :
  error in evaluating the argument 'col' in selecting a method for function 
'select': Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"Sepal.Length" among (Sepal.Length, Sepal.Width, Petal.Length, Petal.Width, 
Species);
at org.apache.spark.s
--
Warm regards,
Devesh.



--
Warm regards,
Devesh.


RE: different behavior while using createDataFrame and read.df in SparkR

2016-02-05 Thread Sun, Rui
I guess this is related to https://issues.apache.org/jira/browse/SPARK-11976

When calling createDataFrame on iris, the “.” Character in column names will be 
replaced with “_”.
It seems that when you create a DataFrame from the CSV file, the “.” Character 
in column names are still there.

From: Devesh Raj Singh [mailto:raj.deves...@gmail.com]
Sent: Friday, February 5, 2016 2:44 PM
To: user@spark.apache.org
Cc: Sun, Rui
Subject: different behavior while using createDataFrame and read.df in SparkR


Hi,

I am using Spark 1.5.1

When I do this

df <- createDataFrame(sqlContext, iris)

#creating a new column for category "Setosa"

df$Species1<-ifelse((df)[[5]]=="setosa",1,0)

head(df)

output: new column created

  Sepal.Length Sepal.Width Petal.Length Petal.Width Species
1  5.1 3.5  1.4 0.2  setosa
2  4.9 3.0  1.4 0.2  setosa
3  4.7 3.2  1.3 0.2  setosa
4  4.6 3.1  1.5 0.2  setosa
5  5.0 3.6  1.4 0.2  setosa
6  5.4 3.9  1.7 0.4  setosa

but when I saved the iris dataset as a CSV file and try to read it and convert 
it to sparkR dataframe

df <- read.df(sqlContext,"/Users/devesh/Github/deveshgit2/bdaml/data/iris/",
  source = "com.databricks.spark.csv",header = "true",inferSchema = 
"true")

now when I try to create new column

df$Species1<-ifelse((df)[[5]]=="setosa",1,0)
I get the below error:

16/02/05 12:11:01 ERROR RBackendHandler: col on 922 failed
Error in select(x, x$"*", alias(col, colName)) :
  error in evaluating the argument 'col' in selecting a method for function 
'select': Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"Sepal.Length" among (Sepal.Length, Sepal.Width, Petal.Length, Petal.Width, 
Species);
at org.apache.spark.s
--
Warm regards,
Devesh.


RE: sparkR not able to create /append new columns

2016-02-03 Thread Sun, Rui
Devesh,

Note that DataFrame is immutable. withColumn returns a new DataFrame instead of 
adding a column in-pace to the DataFrame being operated.

So, you can modify the for loop like:

for (j in 1:lev)

{

   dummy.df.new<-withColumn(df,
   paste0(colnames(cat.column),j),
   ifelse(df$Species==levels(as.factor(unlist(cat.column)))[j],1,0) )

   df<-dummy.df.new
}

As you can see, withColumn supports adding only one column, it may be more 
convenient if withColumn supports adding multiple columns at once. There is a 
JIRA requesting such feature 
(https://issues.apache.org/jira/browse/SPARK-12225) which is still under 
discussion. If you desire this feature, you could comment on it.

From: Franc Carter [mailto:franc.car...@gmail.com]
Sent: Wednesday, February 3, 2016 7:40 PM
To: Devesh Raj Singh
Cc: user@spark.apache.org
Subject: Re: sparkR not able to create /append new columns


Yes, I didn't work out how to solve that - sorry


On 3 February 2016 at 22:37, Devesh Raj Singh 
> wrote:
Hi,

but "withColumn" will only add once, if i want to add columns to the same 
dataframe in a loop it will keep overwriting the added column and in the end 
the last added column( in the loop) will be the added column. like in my code 
above.

On Wed, Feb 3, 2016 at 5:05 PM, Franc Carter 
> wrote:

I had problems doing this as well - I ended up using 'withColumn', it's not 
particularly graceful but it worked (1.5.2 on AWS EMR)

cheerd

On 3 February 2016 at 22:06, Devesh Raj Singh 
> wrote:
Hi,

i am trying to create dummy variables in sparkR by creating new columns for 
categorical variables. But it is not appending the columns


df <- createDataFrame(sqlContext, iris)
class(dtypes(df))

cat.column<-vector(mode="character",length=nrow(df))
cat.column<-collect(select(df,df$Species))
lev<-length(levels(as.factor(unlist(cat.column
varb.names<-vector(mode="character",length=lev)
for (i in 1:lev){

  varb.names[i]<-paste0(colnames(cat.column),i)

}

for (j in 1:lev)

{

   dummy.df.new<-withColumn(df,paste0(colnames
   (cat.column),j),if else(df$Species==levels(as.factor(un list(cat.column))
   [j],1,0) )

}

I am getting the below output for

head(dummy.df.new)

output:

  Sepal_Length Sepal_Width Petal_Length Petal_Width Species Species1
1  5.1 3.5  1.4 0.2  setosa1
2  4.9 3.0  1.4 0.2  setosa1
3  4.7 3.2  1.3 0.2  setosa1
4  4.6 3.1  1.5 0.2  setosa1
5  5.0 3.6  1.4 0.2  setosa1
6  5.4 3.9  1.7 0.4  setosa1

Problem: Species2 and Species3 column are not getting added to the dataframe

--
Warm regards,
Devesh.



--
Franc



--
Warm regards,
Devesh.



--
Franc


RE: can we do column bind of 2 dataframes in spark R? similar to cbind in R?

2016-02-02 Thread Sun, Rui
Devesh,

The cbind-like operation is not supported by Scala DataFrame API, so it is also 
not supported in SparkR.

You may try to workaround this by trying the approach in 
http://stackoverflow.com/questions/32882529/how-to-zip-twoor-more-dataframe-in-spark

You could also submit a JIRA requesting such feature in Spark community.

From: Devesh Raj Singh [mailto:raj.deves...@gmail.com]
Sent: Tuesday, February 2, 2016 2:08 PM
To: user@spark.apache.org
Subject: can we do column bind of 2 dataframes in spark R? similar to cbind in 
R?

Hi,

I want to merge 2 dataframes in sparkR columnwise similar to cbind in R. We 
have "unionAll" for r bind but could not find anything for cbind in sparkR

--
Warm regards,
Devesh.


RE: building spark 1.6 throws error Rscript: command not found

2016-01-19 Thread Sun, Rui
Hi, Mich,
Building Spark with SparkR profile enabled requires installation of R on your 
building machine.

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Tuesday, January 19, 2016 5:27 AM
To: Mich Talebzadeh
Cc: user @spark
Subject: Re: building spark 1.6 throws error Rscript: command not found

Please see:
http://www.jason-french.com/blog/2013/03/11/installing-r-in-linux/

On Mon, Jan 18, 2016 at 1:22 PM, Mich Talebzadeh 
> wrote:
./make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.6 -Phive 
-Phive-thriftserver -Pyarn


INFO] --- exec-maven-plugin:1.4.0:exec (sparkr-pkg) @ spark-core_2.10 ---
../R/install-dev.sh: line 40: Rscript: command not found
[INFO] 
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... SUCCESS [  2.921 s]
[INFO] Spark Project Test Tags  SUCCESS [  2.921 s]
[INFO] Spark Project Launcher . SUCCESS [ 17.252 s]
[INFO] Spark Project Networking ... SUCCESS [  9.237 s]
[INFO] Spark Project Shuffle Streaming Service  SUCCESS [  4.969 s]
[INFO] Spark Project Unsafe ... SUCCESS [ 13.384 s]
[INFO] Spark Project Core . FAILURE [01:34 min]


How can I resolve this by any chance?


Thanks


Dr Mich Talebzadeh

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

Sybase ASE 15 Gold Medal Award 2008
A Winning Strategy: Running the most Critical Financial Data on ASE 15
http://login.sybase.com/files/Product_Overviews/ASE-Winning-Strategy-091908.pdf
Author of the books "A Practitioner’s Guide to Upgrading to Sybase ASE 15", 
ISBN 978-0-9563693-0-7.
co-author "Sybase Transact SQL Guidelines Best Practices", ISBN 
978-0-9759693-0-4
Publications due shortly:
Complex Event Processing in Heterogeneous Environments, ISBN: 978-0-9563693-3-8
Oracle and Sybase, Concepts and Contrasts, ISBN: 978-0-9563693-1-4, volume one 
out shortly

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.




RE: 回复: how to use sparkR or spark MLlib load csv file on hdfs thencalculate covariance

2015-12-28 Thread Sun, Rui
Spark does not support computing cov matrix  now. But there is a PR for it. 
Maybe you can try it: https://issues.apache.org/jira/browse/SPARK-11057


From: zhangjp [mailto:592426...@qq.com]
Sent: Tuesday, December 29, 2015 3:21 PM
To: Felix Cheung; Andy Davidson; Yanbo Liang
Cc: user
Subject: 回复: how to use sparkR or spark MLlib load csv file on hdfs 
thencalculate covariance


Now i have huge columns about 5k -20k, so if i want to Calculate covariance 
matrix ,which is the best method or common method ?

-- 原始邮件 --
发件人: "Felix 
Cheung";>;
发送时间: 2015年12月29日(星期二) 中午12:45
收件人: "Andy 
Davidson">; 
"zhangjp"<592426...@qq.com>; "Yanbo 
Liang">;
抄送: "user">;
主题: Re: how to use sparkR or spark MLlib load csv file on hdfs thencalculate 
covariance

Make sure you add the csv spark package as this example here so that the source 
parameter in R read.df would work:


https://spark.apache.org/docs/latest/sparkr.html#from-data-sources

_
From: Andy Davidson 
>
Sent: Monday, December 28, 2015 10:24 AM
Subject: Re: how to use sparkR or spark MLlib load csv file on hdfs then 
calculate covariance
To: zhangjp <592426...@qq.com>, Yanbo Liang 
>
Cc: user >

Hi Yanbo

I use spark.csv to load my data set. I work with both Java and Python. I would 
recommend you print the first couple of rows and also print the schema to make 
sure your data is loaded as you expect. You might find the following code 
example helpful. You may need to programmatically set the schema depending on 
what you data looks like



public class LoadTidyDataFrame {

static  DataFrame fromCSV(SQLContext sqlContext, String file) {

DataFrame df = sqlContext.read()

.format("com.databricks.spark.csv")

.option("inferSchema", "true")

.option("header", "true")

.load(file);



return df;

}

}



From: Yanbo Liang < yblia...@gmail.com>
Date: Monday, December 28, 2015 at 2:30 AM
To: zhangjp < 592426...@qq.com>
Cc: "user @spark" < user@spark.apache.org>
Subject: Re: how to use sparkR or spark MLlib load csv file on hdfs then 
calculate covariance

Load csv file:
df <- read.df(sqlContext, "file-path", source = "com.databricks.spark.csv", 
header = "true")
Calculate covariance:
cov <- cov(df, "col1", "col2")

Cheers
Yanbo


2015-12-28 17:21 GMT+08:00 zhangjp <592426...@qq.com>:
hi  all,
I want  to use sparkR or spark MLlib  load csv file on hdfs then calculate  
covariance, how to do it .
thks.




RE: Do existing R packages work with SparkR data frames

2015-12-22 Thread Sun, Rui
Hi, Lan,

Generally, it is hard to use existing R packages working with R data frames to 
work with SparkR data frames transparently. Typically the algorithms have to be 
re-written to use SparkR DataFrame API.

Collect is for collecting the data from a SparkR DataFrame into a local 
data.frame. Since a SparkR DataFrame is a distributed data set, typically you 
call methods of SparkR DataFrame API to manipulate its data distributedly and 
after the result is enough to fit in the memory of local machine, you can 
collect it for local processing.

From: Duy Lan Nguyen [mailto:ndla...@gmail.com]
Sent: Wednesday, December 23, 2015 5:50 AM
To: user@spark.apache.org
Subject: Do existing R packages work with SparkR data frames

Hello,

Is it possible for existing R Machine Learning packages (which work with R data 
frames) such as bnlearn, to work with SparkR data frames? Or do I need to 
convert SparkR data frames to R data frames? Is "collect" the function to do 
the conversion, or how else to do that?

Many Thanks,
Lan


RE: SparkR read.df failed to read file from local directory

2015-12-08 Thread Sun, Rui
Hi, Boyu,

Does the local file “/home/myuser/test_data/sparkR/flights.csv” really exist?

I just tried, and had no problem creating a DataFrame from a local CSV file.

From: Boyu Zhang [mailto:boyuzhan...@gmail.com]
Sent: Wednesday, December 9, 2015 1:49 AM
To: Felix Cheung
Cc: user@spark.apache.org
Subject: Re: SparkR read.df failed to read file from local directory

Thanks for the comment Felix, I tried giving 
"/home/myuser/test_data/sparkR/flights.csv", but it tried to search the path in 
hdfs, and gave errors:

15/12/08 12:47:10 ERROR r.RBackendHandler: loadDF on 
org.apache.spark.sql.api.r.SQLUtils failed
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
  org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: 
hdfs://hostname:8020/home/myuser/test_data/sparkR/flights.csv
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:251)
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.RDD$$

Thanks,
Boyu

On Tue, Dec 8, 2015 at 12:38 PM, Felix Cheung 
> wrote:
Have you tried

flightsDF <- read.df(sqlContext, "/home/myuser/test_data/sparkR/flights.csv", 
source = "com.databricks.spark.csv", header = "true")


_
From: Boyu Zhang >
Sent: Tuesday, December 8, 2015 8:47 AM
Subject: SparkR read.df failed to read file from local directory
To: >


Hello everyone,

I tried to run the example data--manipulation.R, and can't get it to read the 
flights.csv file that is stored in my local fs. I don't want to store big files 
in my hdfs, so reading from a local fs (lustre fs) is the desired behavior for 
me.

I tried the following:

flightsDF <- read.df(sqlContext, 
"file:///home/myuser/test_data/sparkR/flights.csv",
 source = "com.databricks.spark.csv", header = "true")

I got the message and eventually failed:

15/12/08 11:42:41 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in 
memory on 
hathi-a003.rcac.purdue.edu:33894 
(size: 14.4 KB, free: 530.2 MB)
15/12/08 11:42:41 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 
(TID 9, hathi-a003.rcac.purdue.edu): 
java.io.FileNotFoundException: File 
file:/home/myuser/test_data/sparkR/flights.csv does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:520)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:398)
at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:137)
at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:763)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:106)
at 
org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:239)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Can someone please provide comments? Any tips is appreciated, thank you!

Boyu Zhang





RE: SparkR DataFrame , Out of memory exception for very small file.

2015-11-22 Thread Sun, Rui
Vipul,

Not sure if I understand your question. DataFrame is immutable. You can't 
update a DataFrame.

Could you paste some log info for the OOM error?

-Original Message-
From: vipulrai [mailto:vipulrai8...@gmail.com] 
Sent: Friday, November 20, 2015 12:11 PM
To: user@spark.apache.org
Subject: SparkR DataFrame , Out of memory exception for very small file.

Hi Users,

I have a general doubt regarding DataFrames in SparkR.

I am trying to read a file from Hive and it gets created as DataFrame.

sqlContext <- sparkRHive.init(sc)

#DF
sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true', 
 source = "com.databricks.spark.csv", inferSchema='true')

registerTempTable(sales,"Sales")

Do I need to create a new DataFrame for every update to the DataFrame like 
addition of new column or  need to update the original sales DataFrame.

sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as a")


Please help me with this , as the orignal file is only 20MB but it throws out 
of memory exception on a cluster of 4GB Master and Two workers of 4GB each.

Also, what is the logic with DataFrame do I need to register and drop tempTable 
after every update??

Thanks,
Vipul



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Connecting SparkR through Yarn

2015-11-13 Thread Sun, Rui
I guess this is not related to SparkR. It seems that Spark can’t pick 
hostname/IP address of RM.

Make sure you have correctly set YARN_CONF_DIR env var and have configured 
address of RM in yarn-site.xml.

From: Amit Behera [mailto:amit.bd...@gmail.com]
Sent: Friday, November 13, 2015 9:38 PM
To: Sun, Rui; user@spark.apache.org
Subject: Re: Connecting SparkR through Yarn

Hi Sun,
Thank you for reply.

I did the same, but now I am getting another issue.

i.e: Not able to connect to ResourceManager after submitting the job
the Error message showing something like this

Connecting to ResourceManager at /0.0.0.0:8032<http://0.0.0.0:8032>
- INFO ipc.Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8032<http://0.0.0.0/0.0.0.0:8032>. Already tried 0 time(s);
-INFO ipc.Client: Retrying connect to server: 
0.0.0.0/0.0.0.0:8032<http://0.0.0.0/0.0.0.0:8032>. Already tried 1 time(s)


I tried resolving this , but not able to do.



On Wed, Nov 11, 2015 at 12:02 PM, Sun, Rui 
<rui@intel.com<mailto:rui@intel.com>> wrote:
Amit,
You can simply set “MASTER” as “yarn-client” before calling sparkR.init().
Sys.setenv("MASTER"="yarn-client")

I assume that you have set “YARN_CONF_DIR” env variable required for running 
Spark on YARN.

If you want to set more YARN specific configurations, you can for example
Sys.setenv ("SPARKR_SUBMIT_ARGS", " --master yarn-client --num-executors 4 
sparkr-shell"
Before calling sparkR.init().

From: Amit Behera [mailto:amit.bd...@gmail.com<mailto:amit.bd...@gmail.com>]
Sent: Monday, November 9, 2015 2:36 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Connecting SparkR through Yarn

Hi All,

Spark Version = 1.5.1
Hadoop Version = 2.6.0

I set up the cluster in Amazon EC2 machines (1+5)
I am able create a SparkContext object using init method from RStudio.

But I do not know how can I create a SparkContext object in yarn mode.

I got the below link to run on yarn. but in this document its given for Spark 
version >= 0.9.0 and <= 1.2.

https://github.com/amplab-extras/SparkR-pkg/blob/master/README.md#running-on-yarn


Please help me how can I connect SparkR on Yarn.



Thanks,
Amit.



RE: Connecting SparkR through Yarn

2015-11-10 Thread Sun, Rui
Amit,
You can simply set “MASTER” as “yarn-client” before calling sparkR.init().
Sys.setenv("MASTER"="yarn-client")

I assume that you have set “YARN_CONF_DIR” env variable required for running 
Spark on YARN.

If you want to set more YARN specific configurations, you can for example
Sys.setenv ("SPARKR_SUBMIT_ARGS", " --master yarn-client --num-executors 4 
sparkr-shell"
Before calling sparkR.init().

From: Amit Behera [mailto:amit.bd...@gmail.com]
Sent: Monday, November 9, 2015 2:36 AM
To: user@spark.apache.org
Subject: Connecting SparkR through Yarn

Hi All,

Spark Version = 1.5.1
Hadoop Version = 2.6.0

I set up the cluster in Amazon EC2 machines (1+5)
I am able create a SparkContext object using init method from RStudio.

But I do not know how can I create a SparkContext object in yarn mode.

I got the below link to run on yarn. but in this document its given for Spark 
version >= 0.9.0 and <= 1.2.

https://github.com/amplab-extras/SparkR-pkg/blob/master/README.md#running-on-yarn


Please help me how can I connect SparkR on Yarn.



Thanks,
Amit.


RE: [sparkR] Any insight on java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-11-07 Thread Sun, Rui
This is probably because your config option actually do not take effect. Please 
refer to the email thread titled “How to set memory for SparkR with 
master="local[*]"”, which may answer you.

I recommend you to try to use SparkR built from the master branch, which 
contains two fixes that may help you in your use case:
https://issues.apache.org/jira/browse/SPARK-11340
https://issues.apache.org/jira/browse/SPARK-11258

BTW, it seems that there is a config conflict in your settings?
spark.driver.memory="30g",
spark.driver.extraJavaOptions="-Xms5g -Xmx5g


From: Dhaval Patel [mailto:dhaval1...@gmail.com]
Sent: Saturday, November 7, 2015 12:26 AM
To: Spark User Group
Subject: [sparkR] Any insight on java.lang.OutOfMemoryError: GC overhead limit 
exceeded

I have been struggling through this error since past 3 days and have tried all 
possible ways/suggestions people have provided on stackoverflow and here in 
this group.

I am trying to read a parquet file using sparkR and convert it into an R 
dataframe for further usage. The file size is not that big, ~4G and 250 mil 
records.

My standalone cluster has more than enough memory and processing power : 24 
core, 128 GB RAM. Here is configuration to give an idea:

Tried this on both spark 1.4.1 and 1.5.1.  I have attached both stack 
traces/logs. Parquet file has 24 partitions.

spark.default.confs=list(spark.cores.max="24",
 spark.executor.memory="50g",
 spark.driver.memory="30g",
 spark.driver.extraJavaOptions="-Xms5g -Xmx5g 
-XX:MaxPermSize=1024M")
sc <- sparkR.init(master="local[24]",sparkEnvir = spark.default.confs)
...
 reading parquet file and storing in R dataframe
med.Rdf <- collect(mednew.DF)




--- Begin Message ---
Hi, Matej,

For the convenience of SparkR users, when they start SparkR without using 
bin/sparkR, (for example, in RStudio), 
https://issues.apache.org/jira/browse/SPARK-11340 enables setting of 
“spark.driver.memory”, (also other similar options, like: 
spark.driver.extraClassPath, spark.driver.extraJavaOptions, 
spark.driver.extraLibraryPath) in the sparkEnvir parameter for sparkR.init() to 
take effect.

Would you like to give it a try? Note the change is on the master branch, you 
have to build Spark from source before using it.


From: Sun, Rui [mailto:rui@intel.com]
Sent: Monday, October 26, 2015 10:24 AM
To: Dirceu Semighini Filho
Cc: user
Subject: RE: How to set memory for SparkR with master="local[*]"

As documented in 
http://spark.apache.org/docs/latest/configuration.html#available-properties,
Note for “spark.driver.memory”:
Note: In client mode, this config must not be set through the SparkConf 
directly in your application, because the driver JVM has already started at 
that point. Instead, please set this through the --driver-memory command line 
option or in your default properties file.

If you are to start a SparkR shell using bin/sparkR, then you can use 
bin/sparkR –driver-memory. You have no chance to set the driver memory size 
after the R shell has been launched via bin/sparkR.

Buf if you are to start a SparkR shell manually without using bin/sparkR (for 
example, in Rstudio), you can:
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS" = "--conf spark.driver.memory=2g sparkr-shell")
sc <- sparkR.init()

From: Dirceu Semighini Filho [mailto:dirceu.semigh...@gmail.com]
Sent: Friday, October 23, 2015 7:53 PM
Cc: user
Subject: Re: How to set memory for SparkR with master="local[*]"

Hi Matej,
I'm also using this and I'm having the same behavior here, my driver has only 
530mb which is the default value.

Maybe this is a bug.

2015-10-23 9:43 GMT-02:00 Matej Holec 
<hol...@gmail.com<mailto:hol...@gmail.com>>:
Hello!

How to adjust the memory settings properly for SparkR with master="local[*]"
in R?


*When running from  R -- SparkR doesn't accept memory settings :(*

I use the following commands:

R>  library(SparkR)
R>  sc <- sparkR.init(master = "local[*]", sparkEnvir =
list(spark.driver.memory = "5g"))

Despite the variable spark.driver.memory is correctly set (checked in
http://node:4040/environment/), the driver has only the default amount of
memory allocated (Storage Memory 530.3 MB).

*But when running from  spark-1.5.1-bin-hadoop2.6/bin/sparkR -- OK*

The following command:

]$ spark-1.5.1-bin-hadoop2.6/bin/sparkR --driver-memory 5g

creates SparkR session with properly adjustest driver memory (Storage Memory
2.6 GB).


Any suggestion?

Thanks
Matej



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-memory-for-SparkR-with-master-local-tp25178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-

RE: [Spark R]could not allocate memory (2048 Mb) in C function 'R_AllocStringBuffer'

2015-11-06 Thread Sun, Rui
Hi,Todd,

"--driver-memory" options specifies the maximum heap memory size of the JVM 
backend for SparkR. The error you faced is memory allocation error of your R 
process. They are different.
I guess that 2G memory bound for a string is limitation of the R interpreter?
That's the reason why we use SparkR for big data processing. Maybe you can try 
copy your CSV to HDFS and have it partitioned across a cluster. But I am not 
sure if Spark can support distributed CSV file or not.

From: Todd [mailto:bit1...@163.com]
Sent: Friday, November 6, 2015 5:00 PM
To: user@spark.apache.org
Subject: [Spark R]could not allocate memory (2048 Mb) in C function 
'R_AllocStringBuffer'

I am launching spark R with following script:

./sparkR --driver-memory 12G

and I try to load a local 3G csv file with following code,

> a=read.transactions("/home/admin/datamining/data.csv",sep="\t",format="single",cols=c(1,2))

but I encounter an error:  could not allocate memory (2048 Mb) in C function 
'R_AllocStringBuffer'

I have allocated 12G memory,not sure why it complains could not allocate 2G 
memory.

Could someone help me? Thanks!


RE: sparkR 1.5.1 batch yarn-client mode failing on daemon.R not found

2015-11-01 Thread Sun, Rui
Tom,

Have you set the “MASTER” evn variable on your machine? What is the value if 
set?

From: Tom Stewart [mailto:stewartthom...@yahoo.com.INVALID]
Sent: Friday, October 30, 2015 10:11 PM
To: user@spark.apache.org
Subject: sparkR 1.5.1 batch yarn-client mode failing on daemon.R not found

I have the following script in a file named test.R:

library(SparkR)
sc <- sparkR.init(master="yarn-client")
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, faithful)
showDF(df)
sparkR.stop()
q(save="no")

If I submit this with "sparkR test.R" or "R  CMD BATCH test.R" or "Rscript 
test.R" it fails with this error:
15/10/29 08:08:49 INFO r.BufferedStreamThread: Fatal error: cannot open file 
'/mnt/hdfs9/yarn/nm-local-dir/usercache/hadoop/appcache/application_1446058618330_0171/container_e805_1446058618330_0171_01_05/sparkr/SparkR/worker/daemon.R':
 No such file or directory
15/10/29 08:08:59 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 
(TID 1)
java.net.SocketTimeoutException: Accept timed out


However, if I launch just an interactive sparkR shell and cut/paste those 
commands - it runs fine.
It also runs fine on the same Hadoop cluster with Spark 1.4.1.
And, it runs fine from batch mode if I just use sparkR.init() and not 
sparkR.init(master="yarn-client")


RE: SparkR job with >200 tasks hangs when calling from web server

2015-11-01 Thread Sun, Rui
I guess that this is not related to SparkR, but something wrong in the Spark 
Core.

Could you try your application logic within spark-shell (you have to use Scala 
DataFrame API) instead of SparkR shell and to see if this issue still happens?

-Original Message-
From: rporcio [mailto:rpor...@gmail.com] 
Sent: Friday, October 30, 2015 11:09 PM
To: user@spark.apache.org
Subject: SparkR job with >200 tasks hangs when calling from web server

Hi,

I have a web server which can execute R codes using SparkR.
The R session is created with the Rscript init.R command where the /init.R/ 
file contains a sparkR initialization section:

/library(SparkR, lib.loc = paste("/opt/Spark/spark-1.5.1-bin-hadoop2.6",
"R", "lib", sep = "/"))
sc <<- sparkR.init(master = "local[4]", appName = "TestR", sparkHome = 
"/opt/Spark/spark-1.5.1-bin-hadoop2.6", sparkPackages =
"com.databricks:spark-csv_2.10:1.2.0")
sqlContext <<- sparkRSQL.init(sc)/

I have the below example R code that I want to execute (flights.csv comes from 
SparkR examples):

/df <- read.df(sqlContext, "/opt/Spark/flights.csv", source = 
"com.databricks.spark.csv", header="true") registerTempTable(df, "flights") 
depDF <- sql(sqlContext, "SELECT dep FROM flights") deps <- collect(depDF)/

If I run this code, it is successfully executed . When I check the Spark UI, I 
see that the belonging job has 2 tasks only.

But if I change the first row to
/df <- repartition(read.df(sqlContext, "/opt/Spark/flights.csv", source = 
"com.databricks.spark.csv", header="true"), 200)/ and execute the R code again, 
the belonging job has 202 tasks from which it sucessfully finishes some (like 
132/202) but then it hangs forever.

If I check the /stderr/ of the executor I can see that the executor can't 
communicate with the driver:

/15/10/30 15:34:24 WARN AkkaRpcEndpointRef: Error sending message [message = 
Heartbeat(0,[Lscala.Tuple2;@36834e15,BlockManagerId(0, 192.168.178.198, 7092))] 
in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [30 seconds]. 
This timeout is controlled by spark.rpc.askTimeout/

I tried to change memory (e.g. 4g to driver), akka and timeout settings but 
with no luck.

Executing the same code (with the repartition part) from R, it successfully 
finishes, so I assume the problem is related somehow to the webserver, but I 
can't figure it out.

I'm using Centos.

Can someone give me some advice what should I try?

Thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-job-with-200-tasks-hangs-when-calling-from-web-server-tp25237.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to set memory for SparkR with master="local[*]"

2015-11-01 Thread Sun, Rui
Hi, Matej,

For the convenience of SparkR users, when they start SparkR without using 
bin/sparkR, (for example, in RStudio), 
https://issues.apache.org/jira/browse/SPARK-11340 enables setting of 
“spark.driver.memory”, (also other similar options, like: 
spark.driver.extraClassPath, spark.driver.extraJavaOptions, 
spark.driver.extraLibraryPath) in the sparkEnvir parameter for sparkR.init() to 
take effect.

Would you like to give it a try? Note the change is on the master branch, you 
have to build Spark from source before using it.


From: Sun, Rui [mailto:rui@intel.com]
Sent: Monday, October 26, 2015 10:24 AM
To: Dirceu Semighini Filho
Cc: user
Subject: RE: How to set memory for SparkR with master="local[*]"

As documented in 
http://spark.apache.org/docs/latest/configuration.html#available-properties,
Note for “spark.driver.memory”:
Note: In client mode, this config must not be set through the SparkConf 
directly in your application, because the driver JVM has already started at 
that point. Instead, please set this through the --driver-memory command line 
option or in your default properties file.

If you are to start a SparkR shell using bin/sparkR, then you can use 
bin/sparkR –driver-memory. You have no chance to set the driver memory size 
after the R shell has been launched via bin/sparkR.

Buf if you are to start a SparkR shell manually without using bin/sparkR (for 
example, in Rstudio), you can:
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS" = "--conf spark.driver.memory=2g sparkr-shell")
sc <- sparkR.init()

From: Dirceu Semighini Filho [mailto:dirceu.semigh...@gmail.com]
Sent: Friday, October 23, 2015 7:53 PM
Cc: user
Subject: Re: How to set memory for SparkR with master="local[*]"

Hi Matej,
I'm also using this and I'm having the same behavior here, my driver has only 
530mb which is the default value.

Maybe this is a bug.

2015-10-23 9:43 GMT-02:00 Matej Holec 
<hol...@gmail.com<mailto:hol...@gmail.com>>:
Hello!

How to adjust the memory settings properly for SparkR with master="local[*]"
in R?


*When running from  R -- SparkR doesn't accept memory settings :(*

I use the following commands:

R>  library(SparkR)
R>  sc <- sparkR.init(master = "local[*]", sparkEnvir =
list(spark.driver.memory = "5g"))

Despite the variable spark.driver.memory is correctly set (checked in
http://node:4040/environment/), the driver has only the default amount of
memory allocated (Storage Memory 530.3 MB).

*But when running from  spark-1.5.1-bin-hadoop2.6/bin/sparkR -- OK*

The following command:

]$ spark-1.5.1-bin-hadoop2.6/bin/sparkR --driver-memory 5g

creates SparkR session with properly adjustest driver memory (Storage Memory
2.6 GB).


Any suggestion?

Thanks
Matej



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-memory-for-SparkR-with-master-local-tp25178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



RE: How to set memory for SparkR with master="local[*]"

2015-10-25 Thread Sun, Rui
As documented in 
http://spark.apache.org/docs/latest/configuration.html#available-properties,
Note for “spark.driver.memory”:
Note: In client mode, this config must not be set through the SparkConf 
directly in your application, because the driver JVM has already started at 
that point. Instead, please set this through the --driver-memory command line 
option or in your default properties file.

If you are to start a SparkR shell using bin/sparkR, then you can use 
bin/sparkR –driver-memory. You have no chance to set the driver memory size 
after the R shell has been launched via bin/sparkR.

Buf if you are to start a SparkR shell manually without using bin/sparkR (for 
example, in Rstudio), you can:
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS" = "--conf spark.driver.memory=2g sparkr-shell")
sc <- sparkR.init()

From: Dirceu Semighini Filho [mailto:dirceu.semigh...@gmail.com]
Sent: Friday, October 23, 2015 7:53 PM
Cc: user
Subject: Re: How to set memory for SparkR with master="local[*]"

Hi Matej,
I'm also using this and I'm having the same behavior here, my driver has only 
530mb which is the default value.

Maybe this is a bug.

2015-10-23 9:43 GMT-02:00 Matej Holec 
>:
Hello!

How to adjust the memory settings properly for SparkR with master="local[*]"
in R?


*When running from  R -- SparkR doesn't accept memory settings :(*

I use the following commands:

R>  library(SparkR)
R>  sc <- sparkR.init(master = "local[*]", sparkEnvir =
list(spark.driver.memory = "5g"))

Despite the variable spark.driver.memory is correctly set (checked in
http://node:4040/environment/), the driver has only the default amount of
memory allocated (Storage Memory 530.3 MB).

*But when running from  spark-1.5.1-bin-hadoop2.6/bin/sparkR -- OK*

The following command:

]$ spark-1.5.1-bin-hadoop2.6/bin/sparkR --driver-memory 5g

creates SparkR session with properly adjustest driver memory (Storage Memory
2.6 GB).


Any suggestion?

Thanks
Matej



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-memory-for-SparkR-with-master-local-tp25178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



RE: Spark_1.5.1_on_HortonWorks

2015-10-22 Thread Sun, Rui
Frans,

SparkR runs with R 3.1+. If possible, latest verison of R is recommended.

From: Saisai Shao [mailto:sai.sai.s...@gmail.com]
Sent: Thursday, October 22, 2015 11:17 AM
To: Frans Thamura
Cc: Ajay Chander; Doug Balog; user spark mailing list
Subject: Re: Spark_1.5.1_on_HortonWorks

SparkR is shipped with Hortonworks version of Spark 1.4.1, there's no 
difference compared to community version, you could refer to the docs of Apache 
Spark. It would be better to ask HDP related questions in ( 
http://hortonworks.com/community/forums/forum/spark/ ). Sorry for not so 
familiar with SparkR related things.

Thanks
Saisai

On Thu, Oct 22, 2015 at 11:02 AM, Frans Thamura 
> wrote:
talking about spark in hdp

Is there reference about Spark-R, and what version should we install in R?
--
Frans Thamura (曽志胜)
Java Champion
Shadow Master and Lead Investor
Meruvian.
Integrated Hypermedia Java Solution Provider.

Mobile: +628557888699
Blog: http://blogs.mervpolis.com/roller/flatburger (id)

FB: http://www.facebook.com/meruvian
TW: http://www.twitter.com/meruvian / @meruvian
Website: http://www.meruvian.org

"We grow because we share the same belief."

On Thu, Oct 22, 2015 at 8:56 AM, Saisai Shao 
> wrote:
> How you start history server, do you still use the history server of 1.3.1,
> or you started the history server in 1.5.1?
>
> The Spark tarball you used is the community version, so Application
> TimelineServer based history provider is not supported, you could comment
> this configuration "spark.history.provider", so it will use default
> FsHistoryProvider, or you could configure "spark.history.provider" to
> "org.apache.spark.deploy.history.FsHistoryProvider".
>
> If you still want to use this ATS based history server, you have to wait for
> the technical preview release of Hortonworks.
>
> Thanks
> Saisai
>
>
> On Thu, Oct 22, 2015 at 9:47 AM, Ajay Chander 
> > wrote:
>>
>> Hi Sasai,
>>
>> Thanks for your time. I have followed your inputs and downloaded
>> "spark-1.5.1-bin-hadoop2.6" on one of the node say node1. And when I did a
>> pie test everything seems to be working fine, except that the spark-history
>> -server running on this node1 has gone down. It was complaining about
>> missing class:
>>
>> 15/10/21 16:41:28 INFO HistoryServer: Registered signal handlers for
>> [TERM, HUP, INT]
>> 15/10/21 16:41:28 WARN SparkConf: The configuration key
>> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3
>> and and may be removed in the future. Please use the new key
>> 'spark.yarn.am.waitTime' instead.
>> 15/10/21 16:41:29 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/21 16:41:29 INFO SecurityManager: Changing view acls to: root
>> 15/10/21 16:41:29 INFO SecurityManager: Changing modify acls to: root
>> 15/10/21 16:41:29 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(root); users
>> with modify permissions: Set(root)
>> Exception in thread "main" java.lang.ClassNotFoundException:
>> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
>> at
>> org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:231)
>> at
>> org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)
>>
>>
>> I went to the lib folder and noticed that
>> "spark-assembly-1.5.1-hadoop2.6.0.jar" is missing that class. I was able to
>> get the spark history server started with 1.3.1 but not 1.5.1. Any inputs on
>> this?
>>
>> Really appreciate your help. Thanks
>>
>> Regards,
>> Ajay
>>
>>
>>
>> On Wednesday, October 21, 2015, Saisai Shao 
>> >
>> wrote:
>>>
>>> Hi Ajay,
>>>
>>> You don't need to copy tarball to all the nodes, only one node you want
>>> to run spark application is enough (mostly the master node), Yarn will help
>>> to distribute the Spark dependencies. The link I mentioned before is the one
>>> you could follow, please read my previous mail.
>>>
>>> Thanks
>>> Saisai
>>>
>>>
>>>
>>> On Thu, Oct 22, 2015 at 1:56 AM, Ajay Chander 
>>> >
>>> wrote:

 Thanks for your kind inputs. Right now I am running spark-1.3.1 on
 YARN(4 node cluster) on a HortonWorks distribution. 

RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-08 Thread Sun, Rui
Can you extract the spark-submit command from the console output, and run it on 
the Shell, and see if there is any error message?

From: Khandeshi, Ami [mailto:ami.khande...@fmr.com]
Sent: Wednesday, October 7, 2015 9:57 PM
To: Sun, Rui; Hossein
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Tried, multiple permutation of setting home… Still same issue
> Sys.setenv(SPARK_HOME="c:\\DevTools\\spark-1.5.1")
> .libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))
> library(SparkR)

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

filter, na.omit

The following objects are masked from ‘package:base’:

intersect, rbind, sample, subset, summary, table, transform

> sc<-sparkR.init(master = "local")
Launching java with spark-submit command 
c:\DevTools\spark-1.5.1/bin/spark-submit.cmd   sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\RtmpkXZVBa\backend_port45ac487f2fbd
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds


From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, October 07, 2015 2:35 AM
To: Hossein; Khandeshi, Ami
Cc: akhandeshi; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Not sure "/C/DevTools/spark-1.5.1/bin/spark-submit.cmd" is a valid?

From: Hossein [mailto:fal...@gmail.com]
Sent: Wednesday, October 7, 2015 12:46 AM
To: Khandeshi, Ami
Cc: Sun, Rui; akhandeshi; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

Have you built the Spark jars? Can you run the Spark Scala shell?

--Hossein

On Tuesday, October 6, 2015, Khandeshi, Ami 
<ami.khande...@fmr.com.invalid<mailto:ami.khande...@fmr.com.invalid>> wrote:
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
>
> sc <- sparkR.init(master="local")
Launching java with spark-submit command 
/C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds
In addition: Warning message:
running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"   --verbose 
sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had 
status 127

-Original Message-
From: Sun, Rui [mailto:rui@intel.com<javascript:;>]
Sent: Tuesday, October 06, 2015 9:39 AM
To: akhandeshi; user@spark.apache.org<javascript:;>
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()?

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com<javascript:;>]
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org<javascript:;>
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<javascript:;> For 
additional commands, e-mail: user-h...@spark.apache.org<javascript:;>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<javascript:;>
For additional commands, e-mail: user-h...@spark.apache.org<javascript:;>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<javascript:;>
For additional commands, e-mail: user-h...@spark.apache.org<javascript:;>


--
--Hossein


RE: How can I read file from HDFS i sparkR from RStudio

2015-10-08 Thread Sun, Rui
Amit,

sqlContext <- sparkRSQL.init(sc)

peopleDF <- read.df(sqlContext, "hdfs://master:9000/sears/example.csv")

have you restarted the R session in RStudio between the two lines?

From: Amit Behera [mailto:amit.bd...@gmail.com]
Sent: Thursday, October 8, 2015 5:59 PM
To: user@spark.apache.org
Subject: How can I read file from HDFS i sparkR from RStudio

Hi All,
I am very new to SparkR.
I am able to run a sample code from example given in the link : 
http://www.r-bloggers.com/installing-and-starting-sparkr-locally-on-windows-os-and-rstudio/
Then I am trying to read a file from HDFS in RStudio, but unable to read.
Below is my code.

Sys.setenv(SPARK_HOME="/home/affine/spark")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))

library(SparkR)

library(rJava)

Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" 
"com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"')

sc <- sparkR.init(master = "spark://master:7077",sparkPackages = 
"com.databricks:spark-csv_2.1:1.2.0")

sqlContext <- sparkRSQL.init(sc)

peopleDF <- read.df(sqlContext, "hdfs://master:9000/sears/example.csv")
Error:


Error in callJMethod(sqlContext, "getConf", "spark.sql.sources.default",  :

  Invalid jobj 1. If SparkR was restarted, Spark operations need to be 
re-executed.
Please tell me where I am going wrong.
Thanks,
Amit.


RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-07 Thread Sun, Rui
Not sure "/C/DevTools/spark-1.5.1/bin/spark-submit.cmd" is a valid?

From: Hossein [mailto:fal...@gmail.com]
Sent: Wednesday, October 7, 2015 12:46 AM
To: Khandeshi, Ami
Cc: Sun, Rui; akhandeshi; user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

Have you built the Spark jars? Can you run the Spark Scala shell?

--Hossein

On Tuesday, October 6, 2015, Khandeshi, Ami 
<ami.khande...@fmr.com.invalid<mailto:ami.khande...@fmr.com.invalid>> wrote:
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
>
> sc <- sparkR.init(master="local")
Launching java with spark-submit command 
/C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds
In addition: Warning message:
running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"   --verbose 
sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had 
status 127

-Original Message-
From: Sun, Rui [mailto:rui@intel.com<javascript:;>]
Sent: Tuesday, October 06, 2015 9:39 AM
To: akhandeshi; user@spark.apache.org<javascript:;>
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()?

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com<javascript:;>]
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org<javascript:;>
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<javascript:;> For 
additional commands, e-mail: user-h...@spark.apache.org<javascript:;>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<javascript:;>
For additional commands, e-mail: user-h...@spark.apache.org<javascript:;>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<javascript:;>
For additional commands, e-mail: user-h...@spark.apache.org<javascript:;>


--
--Hossein


RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-06 Thread Sun, Rui
What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()? 

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com] 
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") : 
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: textFile() and includePackage() not found

2015-09-27 Thread Sun, Rui
Eugene,

SparkR RDD API is private for now 
(https://issues.apache.org/jira/browse/SPARK-7230)

You can use SparkR::: prefix to access those private functions.

-Original Message-
From: Eugene Cao [mailto:eugene...@163.com] 
Sent: Monday, September 28, 2015 8:02 AM
To: user@spark.apache.org
Subject: textFile() and includePackage() not found

Error: no methods for 'textFile'
when I run the following 2nd command after SparkR initialized

sc <- sparkR.init(appName = "RwordCount") lines <- textFile(sc, args[[1]])

But the following command works:
lines2 <- SparkR:::textFile(sc, "C:\\SelfStudy\\SPARK\\sentences2.txt") 

In addition, it says in official web "The includePackage command can be used to 
indicate packages...", but includePackage(sc, Matrix) with error: not find?

Thanks a lot in advance!

Eugene Cao
Xi'an Jiaotong University




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/textFile-and-includePackage-not-found-tp24834.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkR for accumulo

2015-09-23 Thread Sun, Rui
No.

It is possible you create a helper function which can creat accumulo data RDDs 
in Scala or Java (maybe put such code in a JAR, add using --jar   on 
the command line to start SparkR to use it ?) and in SparkR you can use the 
private functions like callJMethod to call it and the created RDD objects can 
be referenced on R side.

However, there is a critical step missing in SparkR now, which is the support 
of conversion from a source RDD (other than text file RDD) to RRDD. If you 
can't convert a source RDD from JVM to RRDD, you can't further use SparkR RDD 
API to apply transformations on it.

-Original Message-
From: madhvi.gupta [mailto:madhvi.gu...@orkash.com] 
Sent: Wednesday, September 23, 2015 11:42 AM
To: Sun, Rui; user
Subject: Re: SparkR for accumulo

Hi Rui,

Cant we use the accumulo data RDD created from JAVA in spark, in sparkR?

Thanks and Regards
Madhvi Gupta

On Tuesday 22 September 2015 04:42 PM, Sun, Rui wrote:
> I am afraid that there is no support for accumulo in SparkR now, because:
>
> 1. It seems that there is no data source support for accumulo, so we 
> can't create SparkR dataframe on accumulo 2. It is possible to create RDD 
> from accumulo via AccumuloInputFormat in Scala. But unfortunately, SparkR 
> does not support creating RDD from Hadoop files other than text file.
>
> -Original Message-
> From: madhvi.gupta [mailto:madhvi.gu...@orkash.com]
> Sent: Tuesday, September 22, 2015 6:25 PM
> To: user
> Subject: SparkR for accumulo
>
> Hi,
>
> I want to process accumulo data in R through sparkR.Can anyone help me and 
> let me know how to get accumulo data in spark to be used in R?
>
> --
> Thanks and Regards
> Madhvi Gupta
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: Support of other languages?

2015-09-22 Thread Sun, Rui
Although the data is RDD[Array[Byte]] where content is not meaningful to Spark 
Core, it has to be on heap, as Spark Core manipulates RDD transformations on 
heap.

SPARK-10399 is irrelevant. it aims to manipulate off-heap data using C++library 
via JNI. This is done in-process.

From: Rahul Palamuttam [mailto:rahulpala...@gmail.com]
Sent: Thursday, September 17, 2015 3:09 PM
To: Sun, Rui
Cc: user@spark.apache.org
Subject: Re: Support of other languages?

Hi,

Thank you for both responses.
Sun you pointed out the exact issue I was referring to, which is 
copying,serializing, deserializing, the byte-array between the JVM heap and the 
worker memory.
It also doesn't make sense why the byte-array should be kept on-heap, since the 
data of the parent partition is just a byte array that only makes sense to a 
python environment.
Shouldn't we be writing the byte-array off-heap and provide supporting 
interfaces for outside processes to read and interact with the data?
I'm probably oversimplifying what is really required to do this.

There is a recent JIRA which I thought was interesting with respect to our 
discussion.
https://issues.apache.org/jira/browse/SPARK-10399t JIRA

There's also a suggestion, at the bottom of the JIRA, that considers exposing 
on-heap memory which is pretty interesting.

- Rahul Palamuttam


On Wed, Sep 9, 2015 at 4:52 AM, Sun, Rui 
<rui@intel.com<mailto:rui@intel.com>> wrote:
Hi, Rahul,

To support a new language other than Java/Scala in spark, it is different 
between RDD API and DataFrame API.

For RDD API:

RDD is a distributed collection of the language-specific data types whose 
representation is unknown to JVM. Also transformation functions for RDD are 
written in the language which can't be executed on JVM. That's why worker 
processes of the language runtime are needed in such case. Generally, to 
support RDD API in the language, a subclass of the Scala RDD is needed on JVM 
side (for example, PythonRDD for python, RRDD for R) where compute() is 
overridden to send the serialized parent partition data (yes, what you mean 
data copy happens here) and the serialized transformation function via socket 
to the worker process. The worker process deserializes the partition data and 
the transformation function, then applies the function to the data. The result 
is sent back to JVM via socket after serialization as byte array. From JVM's 
viewpoint, the resulting RDD is a collection of byte arrays.

Performance is a concern in such case, as there are overheads, like launching 
of worker processes, serialization/deserialization of partition data, 
bi-directional communication cost of the data.
Besides, as the JVM can't know the real representation of data in the RDD, it 
is difficult and complex to support shuffle and aggregation operations. The 
Spark Core's built-in aggregator and shuffle can't be utilized directly. There 
should be language specific implementation to support these operations, which 
cause additional overheads.

Additional memory occupation by the worker processes is also a concern.

For DataFrame API:

Things are much simpler than RDD API. For DataFrame, data is read from Data 
Source API and is represented as native objects within the JVM and there is no 
language-specific transformation functions. Basically, DataFrame API in the 
language are just method wrappers to the corresponding ones in Scala DataFrame 
API.

Performance is not a concern. The computation is done on native objects in JVM, 
virtually no performance lost.

The only exception is UDF in DataFrame. The UDF() has to rely on language 
worker processes, similar to RDD API.

-Original Message-
From: Rahul Palamuttam 
[mailto:rahulpala...@gmail.com<mailto:rahulpala...@gmail.com>]
Sent: Tuesday, September 8, 2015 10:54 AM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Support of other languages?

Hi,
I wanted to know more about how Spark supports R and Python, with respect to 
what gets copied into the language environments.

To clarify :

I know that PySpark utilizes py4j sockets to pass pickled python functions 
between the JVM and the python daemons. However, I wanted to know how it passes 
the data from the JVM into the daemon environment. I assume it has to copy the 
data over into the new environment, since python can't exactly operate in JVM 
heap space, (or can it?).

I had the same question with respect to SparkR, though I'm not completely 
familiar with how they pass around native R code through the worker JVM's.

The primary question I wanted to ask is does Spark make a second copy of data, 
so language-specific daemons can operate on the data? What are some of the 
other limitations encountered when we try to offer multi-language support, 
whether it's in performance or in general software architecture.
With python in particular the collect operation must be first written to disk 
and then read back from the python driver proces

RE: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-17 Thread Sun, Rui
The existing algorithms operating on R data.frame can't simply operate on 
SparkR DataFrame. They have to be re-implemented to be based on SparkR 
DataFrame API.

-Original Message-
From: ekraffmiller [mailto:ellen.kraffmil...@gmail.com] 
Sent: Thursday, September 17, 2015 3:30 AM
To: user@spark.apache.org
Subject: SparkR - calling as.vector() with rdd dataframe causes error

Hi,
I have a library of clustering algorithms that I'm trying to run in the SparkR 
interactive shell. (I am working on a proof of concept for a document 
classification tool.) Each algorithm takes a term document matrix in the form 
of a dataframe.  When I pass the method a local dataframe, the clustering 
algorithm works correctly, but when I pass it a spark rdd, it gives an error 
trying to coerce the data into a vector.  Here is the code, that I'm calling 
within SparkR:

# get matrix from a file
file <-
"/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"

#read it into variable
 raw_data <- read.csv(file,sep=',',header=FALSE)

#convert to a local dataframe
localDF = data.frame(raw_data)

# create the rdd
rdd  <- createDataFrame(sqlContext,localDF)

#call the algorithm with the localDF - this works result <- galileo(localDF, 
model='hclust',dist='euclidean',link='ward',K=5)

#call with the rdd - this produces error result <- galileo(rdd, 
model='hclust',dist='euclidean',link='ward',K=5)

Error in as.vector(data) : 
  no method for coercing this S4 class to a vector


I get the same error if I try to directly call as.vector(rdd) as well.

Is there a reason why this works for localDF and not rdd?  Should I be doing 
something else to coerce the object into a vector?

Thanks,
Ellen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: reading files on HDFS /s3 in sparkR -failing

2015-09-10 Thread Sun, Rui
Hi, Roni,

For parquetFile(), it is just a warning, you can get the DataFrame 
successfully, right? It is a bug has been fixed in the latest repo: 
https://issues.apache.org/jira/browse/SPARK-8952

For S3, it is not related to SparkR. I guess it is related to 
http://stackoverflow.com/questions/28029134/how-can-i-access-s3-s3n-from-a-local-hadoop-2-6-installation
 , https://issues.apache.org/jira/browse/SPARK-7442


From: roni [mailto:roni.epi...@gmail.com]
Sent: Friday, September 11, 2015 3:05 AM
To: user@spark.apache.org
Subject: reading files on HDFS /s3 in sparkR -failing


I am trying this -
 ddf <- parquetFile(sqlContext,  
"hdfs://ec2-52-26-180-130.us-west-2.compute.amazonaws.com:9000/IPF_14_1.parquet")
and I get 
path[1]="hdfs://ec2-52-26-180-130.us-west-2.compute.amazonaws.com:9000/IPF_14_1.parquet":
 No such file or directory

when I read file on s3 , I get -  java.io.IOException: No FileSystem for 
scheme: s3

Thanks in advance.
-Roni






RE: Support of other languages?

2015-09-09 Thread Sun, Rui
Hi, Rahul,

To support a new language other than Java/Scala in spark, it is different 
between RDD API and DataFrame API.

For RDD API:

RDD is a distributed collection of the language-specific data types whose 
representation is unknown to JVM. Also transformation functions for RDD are 
written in the language which can't be executed on JVM. That's why worker 
processes of the language runtime are needed in such case. Generally, to 
support RDD API in the language, a subclass of the Scala RDD is needed on JVM 
side (for example, PythonRDD for python, RRDD for R) where compute() is 
overridden to send the serialized parent partition data (yes, what you mean 
data copy happens here) and the serialized transformation function via socket 
to the worker process. The worker process deserializes the partition data and 
the transformation function, then applies the function to the data. The result 
is sent back to JVM via socket after serialization as byte array. From JVM's 
viewpoint, the resulting RDD is a collection of byte arrays.

Performance is a concern in such case, as there are overheads, like launching 
of worker processes, serialization/deserialization of partition data, 
bi-directional communication cost of the data.
Besides, as the JVM can't know the real representation of data in the RDD, it 
is difficult and complex to support shuffle and aggregation operations. The 
Spark Core's built-in aggregator and shuffle can't be utilized directly. There 
should be language specific implementation to support these operations, which 
cause additional overheads.

Additional memory occupation by the worker processes is also a concern.

For DataFrame API:

Things are much simpler than RDD API. For DataFrame, data is read from Data 
Source API and is represented as native objects within the JVM and there is no 
language-specific transformation functions. Basically, DataFrame API in the 
language are just method wrappers to the corresponding ones in Scala DataFrame 
API.

Performance is not a concern. The computation is done on native objects in JVM, 
virtually no performance lost.

The only exception is UDF in DataFrame. The UDF() has to rely on language 
worker processes, similar to RDD API.

-Original Message-
From: Rahul Palamuttam [mailto:rahulpala...@gmail.com] 
Sent: Tuesday, September 8, 2015 10:54 AM
To: user@spark.apache.org
Subject: Support of other languages?

Hi,
I wanted to know more about how Spark supports R and Python, with respect to 
what gets copied into the language environments.

To clarify :

I know that PySpark utilizes py4j sockets to pass pickled python functions 
between the JVM and the python daemons. However, I wanted to know how it passes 
the data from the JVM into the daemon environment. I assume it has to copy the 
data over into the new environment, since python can't exactly operate in JVM 
heap space, (or can it?).  

I had the same question with respect to SparkR, though I'm not completely 
familiar with how they pass around native R code through the worker JVM's. 

The primary question I wanted to ask is does Spark make a second copy of data, 
so language-specific daemons can operate on the data? What are some of the 
other limitations encountered when we try to offer multi-language support, 
whether it's in performance or in general software architecture.
With python in particular the collect operation must be first written to disk 
and then read back from the python driver process.

Would appreciate any insight on this, and if there is any work happening in 
this area.

Thank you,

Rahul Palamuttam  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Support-of-other-languages-tp24599.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkR csv without headers

2015-08-20 Thread Sun, Rui
Hi,

You can create a DataFrame using load.df() with a specified schema.

Something like:
schema - structType(structField(“a”, “string”), structField(“b”, integer), …)
read.df ( …, schema = schema)

From: Franc Carter [mailto:franc.car...@rozettatech.com]
Sent: Wednesday, August 19, 2015 1:48 PM
To: user@spark.apache.org
Subject: SparkR csv without headers


Hi,

Does anyone have an example of how to create a DataFrame in SparkR  which 
specifies the column names - the csv files I have do not have column names in 
the first row. I can get read a csv nicely with 
com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2, C3 
etc


thanks

--

Franc Carter I  Systems ArchitectI RoZetta Technology



[Description: Description: Description: cid:image003.jpg@01D02903.9B540580]



L4. 55 Harrington Street,  THE ROCKS,  NSW, 2000

PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA

T  +61 2 8355 2515tel:%2B61%202%208355%202515 I
www.rozettatechnology.comhttp://www.rozettatechnology.com/

[cid:image002.jpg@01D02903.0B41B280]

DISCLAIMER: The contents of this email, inclusive of attachments, may be legally

privileged and confidential. Any unauthorised use of the contents is expressly 
prohibited.




RE: SparkR

2015-07-27 Thread Sun, Rui
Simply no. Currently SparkR is the R API of Spark DataFrame, no existing 
algorithms can benefit from it unless they are re-written to be based on the 
API.

There is on-going development on supporting MLlib and ML Pipelines in SparkR: 
https://issues.apache.org/jira/browse/SPARK-6805

From: Mohit Anchlia [mailto:mohitanch...@gmail.com]
Sent: Tuesday, July 28, 2015 1:08 AM
To: user@spark.apache.org
Subject: SparkR

Does SparkR support all the algorithms that R library supports?


RE: unserialize error in sparkR

2015-07-27 Thread Sun, Rui
Hi, 

Do you mean you are running the script with 
https://github.com/amplab-extras/SparkR-pkg and spark 1.2? I am afraid that 
currently there is no development effort and support on  the SparkR-pkg since 
it has been integrated into Spark since Spark 1.4.

Unfortunately, the RDD API and RDD-like API of DataFrame of SparkR is not 
exposed in Spark 1.4 for some considerations. Although not exposed, some 
RDD-like API of DataFrame are actually implemented which you can find in the 
SparkR source code, including 
lapply/lapplyPartition/flatMap/foreach/foreachPartition. Though not 
recommended, but if you really want to use them, you can use SparkR::: to 
access them as a temporary workaround.

There is on-going investigation and discussion on whether to expose a subset of 
RDD API or not, you can refer to 
https://issues.apache.org/jira/browse/SPARK-7264 if you are interested.

-Original Message-
From: Jennifer15 [mailto:bsabe...@purdue.edu] 
Sent: Monday, July 27, 2015 1:47 PM
To: user@spark.apache.org
Subject: unserialize error in sparkR

Hi,
I have a newbie question; I get the following error by increasing the number of 
samples in my sample script  samplescript.R 
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24002/samplescript.R
, which is written in Spark1.2 (no error for small sample of error):

Error in unserialize(obj) : 
ReadItem: unknown type 0, perhaps written by later version of R
Calls: assetForecast ... convertJListToRList - lapply - lapply - FUN   -
unserialize
Execution halted

I tried using Spark1.4 though I could not find lapply or any similar functions 
for dataframes.
I am not sure if this error is because of using spark1.2 though if it is, what 
is the equivalent of lapply/map to work on dataframes?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/unserialize-error-in-sparkR-tp24002.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkR Supported Types - Please add bigint

2015-07-24 Thread Sun, Rui
printSchema calls StructField. buildFormattedString() to output schema 
information. buildFormattedString() use DataType.typeName as string 
representation of  the data type.

LongType. typeName = long
LongType.simpleString = bigint

I am not sure about the difference of these two type name representations.

-Original Message-
From: Exie [mailto:tfind...@prodevelop.com.au] 
Sent: Friday, July 24, 2015 1:35 PM
To: user@spark.apache.org
Subject: Re: SparkR Supported Types - Please add bigint

Interestingly, after more digging, df.printSchema() in raw spark shows the 
columns as a long, not a bigint.

root
 |-- localEventDtTm: timestamp (nullable = true)
 |-- asset: string (nullable = true)
 |-- assetCategory: string (nullable = true)
 |-- assetType: string (nullable = true)
 |-- event: string (nullable = true)
 |-- extras: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- name: string (nullable = true)
 |||-- value: string (nullable = true)
 |-- ipAddress: string (nullable = true)
 |-- memberId: string (nullable = true)
 |-- system: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- trackingId: string (nullable = true)
 |-- version: long (nullable = true)

I'm going to have to keep digging I guess. :(




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Supported-Types-Please-add-bigint-tp23975p23978.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkR Supported Types - Please add bigint

2015-07-23 Thread Sun, Rui
Exie,

Reported your issue: https://issues.apache.org/jira/browse/SPARK-9302

SparkR has support for long(bigint) type in serde. This issue is related to 
support complex Scala types in serde.

-Original Message-
From: Exie [mailto:tfind...@prodevelop.com.au] 
Sent: Friday, July 24, 2015 10:26 AM
To: user@spark.apache.org
Subject: SparkR Supported Types - Please add bigint

Hi Folks,

Using Spark to read in JSON files and detect the schema, it gives me a 
dataframe with a bigint filed. R then fails to import the dataframe as it 
cant convert the type.

 head(mydf)
Error in as.data.frame.default(x[[i]], optional = TRUE) : 
  cannot coerce class jobj to a data.frame

 show(mydf)
DataFrame[localEventDtTm:timestamp, asset:string, assetCategory:string, 
assetType:string, event:string, 
extras:arraystructlt;name:string,value:string, ipAddress:string, 
memberId:string, system:string, timestamp:bigint, title:string, 
trackingId:string, version:bigint]


I believe this is related to:
https://issues.apache.org/jira/browse/SPARK-8840

A sample record in raw JSON looks like this:
{version: 1,event: view,timestamp: 1427846422377,system:
DCDS,asset: 6404476,assetType: myType,assetCategory:
myCategory,extras: [{name: videoSource,value: mySource},{name:
playerType,value: Article},{name: duration,value:
202088}],trackingId: 155629a0-d802-11e4-13ee-6884e43d6000,ipAddress:
165.69.2.4,title: myTitle}

Can someone turn this into a feature request or something for 1.5.0 ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Supported-Types-Please-add-bigint-tp23975.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [SparkR] creating dataframe from json file

2015-07-15 Thread Sun, Rui
You can try selectExpr() of DataFrame. for example,
  y-selectExpr(df, concat(hashtags.text[0],hashtags.text[1])) # [] 
operator is used to extract an item from an array

or

sql(hiveContext, select concat(hashtags.text[0],hashtags.text[1]) from table)

Yeah, the documentation of SparkR is not so complete. You may use scala 
documentation as reference, and try if some method is supported in SparkR.

From: jianshu Weng [jian...@gmail.com]
Sent: Wednesday, July 15, 2015 9:37 PM
To: Sun, Rui
Cc: user@spark.apache.org
Subject: Re: [SparkR] creating dataframe from json file

Thanks.

t - getField(df$hashtags, text) does return a Column. But when I tried to 
call t - getField(df$hashtags, text), it would give an error:

Error: All select() inputs must resolve to integer column positions.
The following do not:
*  getField(df$hashtags, text)

In fact, the text field in df is now return as something like 
List(hashtag1, hashtag2). Want to flat the list out and make the field a 
string like hashtag1, hashtag2.

You mentioned in the email that then you can perform operations on the 
column.. Bear with me if you feel the question is too naive, am still new to 
SparkR. But what operations are allowed on the column, in the SparkR 
documentation, I didnt find any specific function for column operation 
(https://spark.apache.org/docs/latest/api/R/index.html). I didnt even fine 
getField function in the documentation as well.

Thanks,

-JS

On Wed, Jul 15, 2015 at 8:42 PM, Sun, Rui 
rui@intel.commailto:rui@intel.com wrote:
suppose df - jsonFile(sqlContext, json file)

You can extract hashtags.text as a Column object using the following command:
t - getField(df$hashtags, text)
and then you can perform operations on the column.

You can extract hashtags.text as a DataFrame using the following command:
   t - select(df, getField(df$hashtags, text))
   showDF(t)

Or you can use SQL query to extract the field:
  hiveContext - sparkRHive.init()
  df -jsonFile(hiveContext,json file)
  registerTempTable(df, table)
  t - sql(hiveContext, select hashtags.text from table)
  showDF(t)

From: jianshu [jian...@gmail.commailto:jian...@gmail.com]
Sent: Wednesday, July 15, 2015 4:42 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: [SparkR] creating dataframe from json file

hi all,

Not sure whether this the right venue to ask. If not, please point me to the
right group, if there is any.

I'm trying to create a Spark DataFrame from JSON file using jsonFile(). The
call was successful, and I can see the DataFrame created. The JSON file I
have contains a number of tweets obtained from Twitter API. Am particularly
interested in pulling the hashtags contains in the tweets. If I use
printSchema(), the schema is something like:

root
 |-- id_str: string (nullable = true)
 |-- hashtags: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- indices: array (nullable = true)
 ||||-- element: long (containsNull = true)
 |||-- text: string (nullable = true)

showDF() would show something like this :

++
|hashtags|
++
|  List()|
|List([List(125, 1...|
|  List()|
|List([List(0, 3),...|
|List([List(76, 86...|
|  List()|
|List([List(74, 84...|
|  List()|
|  List()|
|  List()|
|List([List(85, 96...|
|List([List(125, 1...|
|  List()|
|  List()|
|  List()|
|  List()|
|List([List(14, 17...|
|  List()|
|  List()|
|List([List(14, 17...|
++

The question is now how to extract the text of the hashtags for each tweet?
Still new to SparkR. Am thinking maybe I need to loop through the dataframe
to extract for each tweet. But it seems that lapply does not really apply on
Spark DataFrame as more. Any though on how to extract the text, as it will
be inside a JSON array.


Thanks,


-JS




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-creating-dataframe-from-json-file-tp23849.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [SparkR] creating dataframe from json file

2015-07-15 Thread Sun, Rui
suppose df - jsonFile(sqlContext, json file)

You can extract hashtags.text as a Column object using the following command:
t - getField(df$hashtags, text)
and then you can perform operations on the column.

You can extract hashtags.text as a DataFrame using the following command:
   t - select(df, getField(df$hashtags, text))
   showDF(t)

Or you can use SQL query to extract the field:
  hiveContext - sparkRHive.init()
  df -jsonFile(hiveContext,json file)
  registerTempTable(df, table)
  t - sql(hiveContext, select hashtags.text from table)
  showDF(t)

From: jianshu [jian...@gmail.com]
Sent: Wednesday, July 15, 2015 4:42 PM
To: user@spark.apache.org
Subject: [SparkR] creating dataframe from json file

hi all,

Not sure whether this the right venue to ask. If not, please point me to the
right group, if there is any.

I'm trying to create a Spark DataFrame from JSON file using jsonFile(). The
call was successful, and I can see the DataFrame created. The JSON file I
have contains a number of tweets obtained from Twitter API. Am particularly
interested in pulling the hashtags contains in the tweets. If I use
printSchema(), the schema is something like:

root
 |-- id_str: string (nullable = true)
 |-- hashtags: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- indices: array (nullable = true)
 ||||-- element: long (containsNull = true)
 |||-- text: string (nullable = true)

showDF() would show something like this :

++
|hashtags|
++
|  List()|
|List([List(125, 1...|
|  List()|
|List([List(0, 3),...|
|List([List(76, 86...|
|  List()|
|List([List(74, 84...|
|  List()|
|  List()|
|  List()|
|List([List(85, 96...|
|List([List(125, 1...|
|  List()|
|  List()|
|  List()|
|  List()|
|List([List(14, 17...|
|  List()|
|  List()|
|List([List(14, 17...|
++

The question is now how to extract the text of the hashtags for each tweet?
Still new to SparkR. Am thinking maybe I need to loop through the dataframe
to extract for each tweet. But it seems that lapply does not really apply on
Spark DataFrame as more. Any though on how to extract the text, as it will
be inside a JSON array.


Thanks,


-JS




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-creating-dataframe-from-json-file-tp23849.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >