Updation of a graph based on changed input

2015-09-23 Thread aparasur
Hi,
I am fairly new to Spark GraphX. I am using graphx to create a graph derived
from data size in the range of 500GB. The inputs used to create this large
graph comes from a set of files with predefined space separated constructs.
I also understand that each time, the graph will be constructed and kept in
memory until the program terminates. The intention here is to create the
large graph, then save only the required sub graphs on disk. 
My question is when the input set of files change, is it possible to update
only those affected parts of the graph or does Spark have to recompute the
entire graph in order to update the subgraphs?
Thanks,
Aarthi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Updation-of-a-graph-based-on-changed-input-tp24783.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Can DataFrames with different schema be joined efficiently

2015-09-23 Thread MrJew
Hello,
I'm using spark streaming to handle quite big data flow.

I'm solving a problem where we are inferring the type from the data ( we
need more specific data types than what JSON provides ). And quite often
there is a small difference between the schemas that we get.

Saving to parquet files and reading from them to do the merge from the
parquet implementation is ridiculously slow especially when the data grows.
SQL joins can do the trick but they aren't much faster either especially
when there are 20 rdds waiting to be joined. Is there a more efficient way
to achieve that?

Regards,
G



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-DataFrames-with-different-schema-be-joined-efficiently-tp24784.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cosine LSH Join

2015-09-23 Thread Nick Pentreath
Looks interesting - I've been trying out a few of the ANN / LSH packages on
spark-packages.org and elsewhere

e.g. http://spark-packages.org/package/tdebatty/spark-knn-graphs and
https://github.com/marufaytekin/lsh-spark

How does this compare? Perhaps you could put it up on spark-packages to get
visibility?




On Wed, Sep 23, 2015 at 3:02 PM, Demir  wrote:

> We've just open sourced a LSH implementation on Spark. We're using this
> internally in order to find topK neighbors after a matrix factorization.
>
> We hope that this might be of use for others:
>
> https://github.com/soundcloud/cosine-lsh-join-spark
>
> For those wondering: lsh is a technique to quickly find most similar
> neighbors in a high dimensional space. This is a problem faced whenever
> objects are represented as vectors in a high dimensional space e.g. words,
> items, users...
>
> cheers
>
> özgür demir
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cosine-LSH-Join-tp24785.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


unsubscribe

2015-09-23 Thread Ntale Lukama



Re: JdbcRDD Constructor

2015-09-23 Thread Rishitesh Mishra
Which version of Spark you are using ??  I can get correct results using
JdbcRDD. Infact there is a test suite precisely for this (JdbcRDDSuite) .
I changed according to your input and got correct results from this test
suite.

On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j  wrote:

> HI All,
>
> JdbcRDD constructor has following parameters,
>
> *JdbcRDD
> *
> (SparkContext
> 
>  sc,
> scala.Function0 getConnection, String sql, *long 
> lowerBound,
> long upperBound, int numPartitions*, scala.Function1 >
>  mapRow,
> scala.reflect.ClassTag 
> > evidence$1)
>
> where the below parameters *lowerBound* refers to Lower boundary of
> entire data, *upperBound *refers to Upper boundary of entire data and 
> *numPartitions
> *refer to Number of partitions
>
> Source table to which JbdcRDD is fetching data from Oracle DB has more
> than 500 records but its confusing when I tried several executions by
> changing "numPartitions" parameter
>
> LowerBound,UpperBound,numPartitions: Output Count
>
> 0 ,100  ,1   : 100
>
> 0 ,100  ,2   : 151
>
> 0 ,100  ,3   : 201
>
>
> Please help me in understanding the why Output count is 151 if
> numPartitions is 2 and Output count is 201 if numPartitions is 3
>
> Regards,
>
> Satish Chandra
>


RE: Yarn Shutting Down Spark Processing

2015-09-23 Thread Bryan
Tathagata,

Simple batch jobs do work. The cluster has a good set of resources and a 
limited input volume on the given Kafka topic.

The job works on the small 3-node standalone-configured cluster I have setup 
for test.

Regards,

Bryan Jeffrey

-Original Message-
From: "Tathagata Das" 
Sent: ‎9/‎23/‎2015 2:46 AM
To: "Bryan Jeffrey" 
Cc: "user" 
Subject: Re: Yarn Shutting Down Spark Processing

Does your simple Spark batch jobs work in the same YARN setup? May be YARN is 
not able to provide resources that you are asking for. 


On Tue, Sep 22, 2015 at 5:49 PM, Bryan Jeffrey  wrote:

Hello.


I have a Spark streaming job running on a cluster managed by Yarn.  The spark 
streaming job starts and receives data from Kafka.  It is processing well and 
then after several seconds I see the following error:


15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not initialize 
after waiting for 10 ms. Please check earlier log output for errors. 
Failing the application.
15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED, 
exitCode: 13, (reason: Timed out waiting for SparkContext.)


The spark process is then (obviously) shut down by Yarn. 


What do I need to change to allow Yarn to initialize Spark streaming (vs. 
batch) jobs?

Thank you,


Bryan Jeffrey

Re: JdbcRDD Constructor

2015-09-23 Thread Rishitesh Mishra
I am using Spark 1.5. I always get count = 100, irrespective of num
partitions.

On Wed, Sep 23, 2015 at 5:00 PM, satish chandra j 
wrote:

> HI,
> Currently using Spark 1.2.2, could you please let me know correct results
> output count which you got it by using JdbcRDDSuite
>
> Regards,
> Satish Chandra
>
> On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra <
> rishi80.mis...@gmail.com> wrote:
>
>> Which version of Spark you are using ??  I can get correct results using
>> JdbcRDD. Infact there is a test suite precisely for this (JdbcRDDSuite)
>> .
>> I changed according to your input and got correct results from this test
>> suite.
>>
>> On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>> JdbcRDD constructor has following parameters,
>>>
>>> *JdbcRDD
>>> *
>>> (SparkContext
>>> 
>>>  sc,
>>> scala.Function0 getConnection, String sql, *long 
>>> lowerBound,
>>> long upperBound, int numPartitions*, scala.Function1>> T
>>> >
>>>  mapRow,
>>> scala.reflect.ClassTag>> 
>>> > evidence$1)
>>>
>>> where the below parameters *lowerBound* refers to Lower boundary of
>>> entire data, *upperBound *refers to Upper boundary of entire data and 
>>> *numPartitions
>>> *refer to Number of partitions
>>>
>>> Source table to which JbdcRDD is fetching data from Oracle DB has more
>>> than 500 records but its confusing when I tried several executions by
>>> changing "numPartitions" parameter
>>>
>>> LowerBound,UpperBound,numPartitions: Output Count
>>>
>>> 0 ,100  ,1   : 100
>>>
>>> 0 ,100  ,2   : 151
>>>
>>> 0 ,100  ,3   : 201
>>>
>>>
>>> Please help me in understanding the why Output count is 151 if
>>> numPartitions is 2 and Output count is 201 if numPartitions is 3
>>>
>>> Regards,
>>>
>>> Satish Chandra
>>>
>>
>>
>


Re: spark on mesos gets killed by cgroups for too much memory

2015-09-23 Thread Dick Davies
I haven't seen that much memory overhead, I think my default is 512Mb
(just a small test stack)
on spark 1.4.x and i can run simple monte carlo simulations without
the 'spike' of RAM usage
when they deploy.

I'd assume something you're using is grabbing a lot of VM up front -
one option you might want
to try is to cap RSS but not virtual memory. On mesos 0.22.x last time
I tested it, that'll allow tasks
to spill into swap if they hit a memory cap - you can do that with the
slave CLI if you want to try it.

On 23 September 2015 at 12:45, Gary Ogden  wrote:
> But the thing I don't get is why is it trying to take all 3GB at startup?
> That seems excessive. So if I want to run a job that only needs 512MB, I
> need to have 3GB free at all times? Doesn't make sense.
>
> We are using sparks native mesos support. On spark submit we use: --mesos
> mesos://zk://prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181/mesos
>
> And we followed the instructions here:
> https://spark.apache.org/docs/1.2.0/running-on-mesos.html
>
> On 23 September 2015 at 08:22, Dick Davies  wrote:
>>
>> I've had this working and never needed to mess with cgconfig.conf, in
>> my experience mesos takes
>> care of that for you.
>>
>> The memory requirement you set during marathon submit is what mesos
>> will cap the task to.
>>
>> semi-unrelated question: why are you not using sparks native mesos support
>> ?
>>
>> On 22 September 2015 at 15:19, oggie  wrote:
>> > I'm using spark 1.2.2 on mesos 0.21
>> >
>> > I have a java job that is submitted to mesos from marathon.
>> >
>> > I also have cgroups configured for mesos on each node. Even though the
>> > job,
>> > when running, uses 512MB, it tries to take over 3GB at startup and is
>> > killed
>> > by cgroups.
>> >
>> > When I start mesos-slave, It's started like this (we use supervisord):
>> > command=/usr/sbin/mesos-slave --disk_watch_interval=10secs
>> > --gc_delay=480mins --isolation=cgroups/cpu,cgroups/mem
>> > --cgroups_hierarchy=/cgroup --resources="mem(*
>> > ):3000;cpus(*):2;ports(*):[25000-3];disk(*):5000"
>> > --cgroups_root=mesos
>> >
>> > --master=zk://prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181/me
>> > sos --work_dir=/tmp/mesos --log_dir=/var/log/mesos
>> >
>> > In cgconfig.conf:
>> > memory.limit_in_bytes="3221225472";
>> >
>> > spark-submit from marathon:
>> > bin/spark-submit --executor-memory 128m --master
>> >
>> > mesos://zk://prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181/mesos
>> > --class com.company.alert.AlertConsumer AlertConsumer.jar --zk
>> > prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181
>> > --mesos
>> >
>> > mesos://zk://prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181/mesos
>> > --spark_executor_uri
>> > http://prodmesosfileserver01/spark-dist/1.2.2/spark-dist-1.2.2.tgz
>> >
>> > We increased the cgroup limit to 6GB and the memory resources from 3000
>> > to
>> > 6000 for the startup of mesos and now cgroups doesn't kill the job
>> > anymore.
>> >
>> > But the question is, how do I limit the start of the job so it isn't
>> > trying
>> > to take 3GB, even if when running it's only using 512MB?
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/spark-on-mesos-gets-killed-by-cgroups-for-too-much-memory-tp24769.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Py4j issue with Python Kafka Module

2015-09-23 Thread ayan guha
Thanks guys.

On Wed, Sep 23, 2015 at 3:54 PM, Tathagata Das  wrote:

> SPARK_CLASSPATH is I believe deprecated right now. So I am not surprised
> that there is some difference in the code paths.
>
> On Tue, Sep 22, 2015 at 9:45 PM, Saisai Shao 
> wrote:
>
>> I think it is something related to class loader, the behavior is
>> different for classpath and --jars. If you want to know the details I think
>> you'd better dig out some source code.
>>
>> Thanks
>> Jerry
>>
>> On Tue, Sep 22, 2015 at 9:10 PM, ayan guha  wrote:
>>
>>> I must have been gone mad :) Thanks for pointing it out. I downloaded
>>> 1.5.0 assembly jar and added it in SPARK_CLASSPATH.
>>>
>>> However, I am getting a new error now
>>>
>>> >>> kvs =
>>> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
>>> ocalhost:9092'})
>>>
>>>
>>> 
>>> 
>>>
>>>   Spark Streaming's Kafka libraries not found in class path. Try one of
>>> the foll
>>> owing.
>>>
>>>   1. Include the Kafka library and its dependencies with in the
>>>  spark-submit command as
>>>
>>>  $ bin/spark-submit --packages
>>> org.apache.spark:spark-streaming-kafka:1.5.0
>>> ...
>>>
>>>   2. Download the JAR of the artifact from Maven Central
>>> http://search.maven.org
>>> /,
>>>  Group Id = org.apache.spark, Artifact Id =
>>> spark-streaming-kafka-assembly,
>>> Version = 1.5.0.
>>>  Then, include the jar in the spark-submit command as
>>>
>>>  $ bin/spark-submit --jars  ...
>>>
>>>
>>> 
>>> 
>>>
>>>
>>>
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>>> \streaming\kafka.py", line 130, in createDirectStream
>>> raise e
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o30.loadClass.
>>> : java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka.KafkaUtilsP
>>> ythonHelper
>>> at java.net.URLClassLoader.findClass(Unknown Source)
>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>>> Source)
>>> at java.lang.reflect.Method.invoke(Unknown Source)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>> at
>>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>>> at py4j.Gateway.invoke(Gateway.java:259)
>>> at
>>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>> at java.lang.Thread.run(Unknown Source)
>>>
>>> >>> os.environ['SPARK_CLASSPATH']
>>> 'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
>>> >>>
>>>
>>>
>>> So I launched pyspark with --jars with the assembly jar. Now it is
>>> working.
>>>
>>> THANK YOU for help.
>>>
>>> Curiosity:  Why adding it to SPARK CLASSPATH did not work?
>>>
>>> Best
>>> Ayan
>>>
>>> On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao 
>>> wrote:
>>>
 I think you're using the wrong version of kafka assembly jar, I think
 Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
 better change to version 1.5.0, looks like you're using Spark 1.5.0, why
 you choose Kafka assembly 1.3.0?


 D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar



 On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:

> Hi
>
> I have added spark assembly jar to SPARK CLASSPATH
>
> >>> print os.environ['SPARK_CLASSPATH']
> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>
>
> Now  I am facing below issue with a test topic
>
> >>> ssc = StreamingContext(sc, 2)
> >>> kvs =
> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
> ocalhost:9092'})
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
> \streaming\kafka.py", line 126, in createDirectStream
> jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
> set(topics), jfr
> omOffsets)
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
> j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
> 

Re: JdbcRDD Constructor

2015-09-23 Thread satish chandra j
HI,
Could anybody provide inputs if they have came across similar issue

@Rishitesh
Could you provide if any sample code to use JdbcRDDSuite


Regards,
Satish Chandra

On Wed, Sep 23, 2015 at 5:14 PM, Rishitesh Mishra 
wrote:

> I am using Spark 1.5. I always get count = 100, irrespective of num
> partitions.
>
> On Wed, Sep 23, 2015 at 5:00 PM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI,
>> Currently using Spark 1.2.2, could you please let me know correct results
>> output count which you got it by using JdbcRDDSuite
>>
>> Regards,
>> Satish Chandra
>>
>> On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra <
>> rishi80.mis...@gmail.com> wrote:
>>
>>> Which version of Spark you are using ??  I can get correct results using
>>> JdbcRDD. Infact there is a test suite precisely for this (JdbcRDDSuite)
>>> .
>>> I changed according to your input and got correct results from this test
>>> suite.
>>>
>>> On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
 HI All,

 JdbcRDD constructor has following parameters,

 *JdbcRDD
 *
 (SparkContext
 
  sc,
 scala.Function0 getConnection, String sql, *long 
 lowerBound,
 long upperBound, int numPartitions*,
 scala.Function1>
  mapRow,
 scala.reflect.ClassTag>>> 
 > evidence$1)

 where the below parameters *lowerBound* refers to Lower boundary of
 entire data, *upperBound *refers to Upper boundary of entire data and 
 *numPartitions
 *refer to Number of partitions

 Source table to which JbdcRDD is fetching data from Oracle DB has more
 than 500 records but its confusing when I tried several executions by
 changing "numPartitions" parameter

 LowerBound,UpperBound,numPartitions: Output Count

 0 ,100  ,1   : 100

 0 ,100  ,2   : 151

 0 ,100  ,3   : 201


 Please help me in understanding the why Output count is 151 if
 numPartitions is 2 and Output count is 201 if numPartitions is 3

 Regards,

 Satish Chandra

>>>
>>>
>>
>


Re: JdbcRDD Constructor

2015-09-23 Thread satish chandra j
HI,
Currently using Spark 1.2.2, could you please let me know correct results
output count which you got it by using JdbcRDDSuite

Regards,
Satish Chandra

On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra 
wrote:

> Which version of Spark you are using ??  I can get correct results using
> JdbcRDD. Infact there is a test suite precisely for this (JdbcRDDSuite) .
> I changed according to your input and got correct results from this test
> suite.
>
> On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI All,
>>
>> JdbcRDD constructor has following parameters,
>>
>> *JdbcRDD
>> *
>> (SparkContext
>> 
>>  sc,
>> scala.Function0 getConnection, String sql, *long 
>> lowerBound,
>> long upperBound, int numPartitions*, scala.Function1> >
>>  mapRow,
>> scala.reflect.ClassTag> 
>> > evidence$1)
>>
>> where the below parameters *lowerBound* refers to Lower boundary of
>> entire data, *upperBound *refers to Upper boundary of entire data and 
>> *numPartitions
>> *refer to Number of partitions
>>
>> Source table to which JbdcRDD is fetching data from Oracle DB has more
>> than 500 records but its confusing when I tried several executions by
>> changing "numPartitions" parameter
>>
>> LowerBound,UpperBound,numPartitions: Output Count
>>
>> 0 ,100  ,1   : 100
>>
>> 0 ,100  ,2   : 151
>>
>> 0 ,100  ,3   : 201
>>
>>
>> Please help me in understanding the why Output count is 151 if
>> numPartitions is 2 and Output count is 201 if numPartitions is 3
>>
>> Regards,
>>
>> Satish Chandra
>>
>
>


RE: Why is 1 executor overworked and other sit idle?

2015-09-23 Thread Richard Eggert
Reading from Cassandra and mapping to CSV are likely getting divided among
executors,  but I think reading from Cassandra is relatively cheap,  and
mapping to CSV is trivial,  but coalescing to a single partition is fairly
expensive and funnels the processing to a single executor, and writing out
to disk is expensive as well,  so all your expensive operations are the
ones that involve a single partition.
On Sep 22, 2015 11:45 PM, "Chirag Dewan"  wrote:

> Thanks Ted and Rich.
>
>
>
> So if I repartition my RDD programmatically and call coalesce on the RDD
> to 1 partition would that generate 1 output file?
>
>
>
> Ahh.. Is my coalesce operation causing 1 partition, hence 1 output file
> and 1 executor working on all the data?
>
>
>
> To summarize this is what I do :-
>
>
>
> 1)  Create a Cassandra RDD
>
> 2)  Cache this RDD
>
> 3)  Map it to CSV
>
> 4)  Coalesce(because I need a single output file)
>
> 5)  Write to file on local file system
>
>
>
> This makes sense.
>
>
>
> Thanks,
>
>
>
> Chirag
>
>
>
>
>
> *From:* Richard Eggert [mailto:richard.egg...@gmail.com]
> *Sent:* Wednesday, September 23, 2015 5:39 AM
> *To:* Ted Yu
> *Cc:* User; Chirag Dewan
> *Subject:* Re: Why is 1 executor overworked and other sit idle?
>
>
>
> If there's only one partition, by definition it will only be handled by
> one executor. Repartition to divide the work up. Note that this will also
> result in multiple output files,  however. If you absolutely need them to
> be combined into a single file,  I suggest using the Unix/Linux 'cat'
> command to concatenate the files afterwards.
>
> Rich
>
> On Sep 22, 2015 9:20 AM, "Ted Yu"  wrote:
>
> Have you tried using repartition to spread the load ?
>
>
>
> Cheers
>
>
> On Sep 22, 2015, at 4:22 AM, Chirag Dewan 
> wrote:
>
> Hi,
>
>
>
> I am using Spark to access around 300m rows in Cassandra.
>
>
>
> My job is pretty simple as I am just mapping my row into a CSV format and
> saving it as a text file.
>
>
>
>
>
> public String call(CassandraRow row)
>
>
> throws Exception {
>
>
> StringBuilder sb = new StringBuilder();
>
>
> sb.append(row.getString(10));
>
>
> sb.append(",");
>
>
> sb.append(row.getString(11));
>
>
> sb.append(",");
>
>
> sb.append(row.getString(8));
>
>
> sb.append(",");
>
>
> sb.append(row.getString(7));
>
>
>
> return
> sb.toString();
>
> }
>
>
>
> My map methods looks like this.
>
>
>
> I am having a 3 node cluster. I observe that driver starts on Node A. And
> executors are spawned on all 3 nodes. But the executor of Node B or C are
> doing all the tasks. It starts a saveasTextFile job with 1 output partition
> and stores the RDDs in memory and also commits the file on local file
> system.
>
>
>
> This executor is using a lot of system memory and CPU while others are
> sitting idle.
>
>
>
> Am I doing something wrong? Is my RDD correctly partitioned?
>
>
>
> Thanks in advance.
>
>
>
>
>
> Chirag
>
>


Cosine LSH Join

2015-09-23 Thread Demir
We've just open sourced a LSH implementation on Spark. We're using this
internally in order to find topK neighbors after a matrix factorization.

We hope that this might be of use for others:

https://github.com/soundcloud/cosine-lsh-join-spark

For those wondering: lsh is a technique to quickly find most similar
neighbors in a high dimensional space. This is a problem faced whenever
objects are represented as vectors in a high dimensional space e.g. words,
items, users...

cheers

özgür demir



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cosine-LSH-Join-tp24785.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Dose spark auto invoke StreamingContext.stop while receive kill signal?

2015-09-23 Thread Bin Wang
I'd like the spark application to be stopped gracefully while received kill
signal, so I add these code:

sys.ShutdownHookThread {
  println("Gracefully stopping Spark Streaming Application")
  ssc.stop(stopSparkContext = true, stopGracefully = true)
  println("Application stopped")
}

But the application is not stopped gracefully:

15/09/23 17:44:38 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
...
15/09/23 17:44:38 INFO streaming.StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook

Dose spark auto invoke StreamingContext.stop for me?


How to turn off Jetty Http stack errors on Spark web

2015-09-23 Thread Rafal Grzymkowski
Hi,

Is it possible to disable Jetty stack trace with errors on Spark master:8080 ?
When I trigger Http server error 500 than anyone can read details.
I tried options available in log4j.properties but it doesn't help.
Any hint?

Thank you for answer
MyCo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: unsubscribe

2015-09-23 Thread Akhil Das
To unsubscribe, you need to send an email to
user-unsubscr...@spark.apache.org as described here
http://spark.apache.org/community.html

Thanks
Best Regards

On Wed, Sep 23, 2015 at 1:23 AM, Stuart Layton 
wrote:

>
>
> --
> Stuart Layton
>


Re: Why RDDs are being dropped by Executors?

2015-09-23 Thread Uthayan Suthakar
Thank you tathagata for your response. It make sense to use the
MEMORY_AND_DISK.
But sometime when I start the job it does not cache everyting at the start.
It only caches 90%. The LRU scheme will only take affect after a while when
the data is not in use but why it failing to cache the data at the beging
of the job? This issue only occurs occasinally. For your information the
Spark version that I'm using is 1.3.0, is there a known issue with this
version?



On 23 September 2015 at 07:13, Tathagata Das  wrote:

> If the RDD is not constantly in use, then the LRU scheme in each executor
> can kick out some of the partitions from memory.
> If you want to avoid recomputing in such cases, you could persist with
> StorageLevel.MEMORY_AND_DISK, where the partitions will dropped to disk
> when kicked from memory. That will avoid recomputing, as the partitions
> will be read from disk -- better than recomputing in most cases.
>
> On Tue, Sep 22, 2015 at 4:20 AM, Uthayan Suthakar <
> uthayan.sutha...@gmail.com> wrote:
>
>>
>> Hello All,
>>
>> We have a Spark Streaming job that reads data from DB (three tables) and
>> cache them into memory ONLY at the start then it will happily carry out the
>> incremental calculation with the new data. What we've noticed occasionally
>> is that one of the RDDs caches only 90% of the data. Therefore, at each
>> execution time the remaining 10% had to be recomputed. This operation is
>> very expensive as it will need to establish a connection to DB and
>> transform the data.We are allocating more than enough memories for each
>> executor ( 4 Executors and each has 2GB memory). Have you come across this
>> issue? Do you know what may cause this issue?
>>
>> Another observation:
>> I started the job yesterday it cached 100% of the RDD but looking at it
>> today it show 90%. what happened to that 10% of data?
>>
>
>


Re: Has anyone used the Twitter API for location filtering?

2015-09-23 Thread Akhil Das
I just tried it and very few tweets has the .getPlace and .getGeoLocation
data available in it.

[image: Inline image 1]

I guess this is more of an issue with the twitter api.



Thanks
Best Regards

On Tue, Sep 22, 2015 at 11:35 PM, Jo Sunad  wrote:

> Thanks Akhil, but I can't seem to get any tweets that include location
> data. For example, when I do stream.filter(status =>
> status.getPlace().getName) and run the code for 20 minutes I only get null
> values.It seems like Twitter might purposely be removing the Place for free
> users?
>
>
>
> On Tue, Sep 22, 2015 at 2:20 AM, Akhil Das 
> wrote:
>
>> ​That's because sometime getPlace returns null and calling getLang over
>> null throws up either null pointer exception or noSuchMethodError. You need
>> to filter out those statuses which doesn't include location data.​
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Sep 18, 2015 at 12:46 AM, Jo Sunad  wrote:
>>
>>> I've been trying to filter for GeoLocation, Place or even Time Zone and
>>> I keep getting null values. I think I got one Place in 20 minutes of the
>>> app running (without any filters on tweets).
>>>
>>> Is this normal? Do I have to try querying rather than filtering?
>>>
>>> my code is following TD's example...
>>>
>>> val stream = TwitterUtils
>>>
>>> val hashtags = stream.map (status => status.getPlace().getName(),
>>> status.getText())
>>>
>>> getText, getFollowers, etc all work fine, I just don't get anything
>>> location based (and getLang() for some reason throws a noMethodError).
>>>
>>> Thanks for the help!
>>>
>>
>>
>


Re: K Means Explanation

2015-09-23 Thread Sabarish Sasidharan
You can't obtain that from the model. But you can always ask the model to
predict the cluster center for your vectors by calling predict().

Regards
Sab

On Wed, Sep 23, 2015 at 7:24 PM, Tapan Sharma 
wrote:

> Hi All,
>
> In the KMeans example provided under mllib, it traverse the outcome of
> KMeansModel to know the cluster centers like this:
>
> KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs,
> KMeans.K_MEANS_PARALLEL());
>
> System.out.println("Cluster centers:");
> for (Vector center : model.clusterCenters()) {
>   System.out.println(" " + center);
> }
> https://spark.apache.org/docs/1.3.0/mllib-clustering.html#k-means
> 
> *How can I know the points contained in the particular cluster?*
>
> Regards
> Tapan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-Explanation-tp24787.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: How to subtract two RDDs with same size

2015-09-23 Thread Sujit Pal
Hi Zhiliang,

How about doing something like this?

val rdd3 = rdd1.zip(rdd2).map(p =>
p._1.zip(p._2).map(z => z._1 - z._2))

The first zip will join the two RDDs and produce an RDD of (Array[Float],
Array[Float]) pairs. On each pair, we zip the two Array[Float] components
together to form an Array[(Float, Float)] and then we subtract the first
element from the second in the inner map (the inner map is a Scala map not
a Spark one).

I tried this out on a notebook:

val rdd1 = sc.parallelize(List(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0),
Array(7.0, 8.0, 9.0)))
val rdd2 = sc.parallelize(List(Array(1.0, 4.0, 3.0), Array(4.0, 10.0, 6.0),
Array(7.0, 16.0, 9.0)))
val rdd3 = rdd1.zip(rdd2).map(p => p._1.zip(p._2).map(z => z._1 - z._2))
rdd3.collect()

gives me:
res0: Array[Array[Double]] = Array(Array(0.0, -2.0, 0.0), Array(0.0, -5.0,
0.0), Array(0.0, -8.0, 0.0))

-sujit


On Wed, Sep 23, 2015 at 12:23 AM, Zhiliang Zhu  wrote:

> there is matrix add API, might map rdd2 each row element to be negative ,
> then make rdd1 and rdd2 and call add ?
>
> Or some more ways ...
>
>
>
> On Wednesday, September 23, 2015 3:11 PM, Zhiliang Zhu <
> zchl.j...@yahoo.com> wrote:
>
>
> Hi All,
>
> There are two RDDs :  RDD rdd1, and RDD rdd2,
> that is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with
> same row number and column number.
>
> I would like to get RDD rdd3,  each element in rdd3 is the
> subtract between rdd1 and rdd2 of the
> same position, which is similar Matrix subtract:
> rdd3 = rdd1 - rdd2 ...
>
> It seemed very difficult to operate this kinds of matrix  arithmetic, even
> is about add, subtract, multiple , diff etc...
>
> I shall  appreciate your help very much~~
> Zhiliang
>
>
>
>
>


Re: Calling a method parallel

2015-09-23 Thread Sujit Pal
Hi Tapan,

Perhaps this may work? It takes a range of 0..100 and creates an RDD out of
them, then calls X(i) on each. The X(i) should be executed on the workers
in parallel.

Scala:
val results = sc.parallelize(0 until 100).map(idx => X(idx))

Python:
results = sc.parallelize(range(100)).map(lambda idx: X(idx))

-sujit


On Wed, Sep 23, 2015 at 6:46 AM, Tapan Sharma 
wrote:

> Hi All,
>
> I want to call a method X(int i) from my Spark program for different values
> of i.
> This means.
> X(1), X(2).. X(n)..
> Each time it returns the one object.
> Currently I am doing this sequentially.
>
> Is there any way to run these in parallel and I get back the list of
> objects?
> Sorry for this basic question.
>
> Regards
> Tapan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Calling-a-method-parallel-tp24786.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Checkpoint files are saved before stream is saved to file (rdd.toDF().write ...)?

2015-09-23 Thread Cody Koeninger
TD can correct me on this, but I believe checkpointing is done after a set
of jobs is submitted, not after they are completed.  If you fail while
processing the jobs, starting over from that checkpoint should put you in
the correct state.

In any case, are you actually observing a loss of messages when killing /
restarting a job?

On Wed, Sep 23, 2015 at 3:49 AM, Petr Novak  wrote:

> Hi,
> I have 2 streams and checkpointing with code based on documentation. One
> stream is transforming data from Kafka and saves them to Parquet file. The
> other stream uses the same stream and does updateStateByKey to compute some
> aggregations. There is no gracefulShutdown.
>
> Both use about this code to save files:
>
> stream.foreachRDD { (rdd, time) =>
>   ...
>   rdd.toDF().write.save(...use time for the directory name...)
> }
>
> It is not idempotent at the moment but let's put this aside for now.
>
> The strange thing is that when I Ctrl+C the job I can see checkpoint file
> with timestamp for the last batch but there are no stream files/directories
> for this timestamp or only one of streams have data saved with time aligned
> with the last checkpoint file. I would expect that checkpoint file is
> created after both streams successfully finishes its saves and that it is
> created at the end of the batch. Otherwise I don't know for what
> checkpointing is good for except maybe cutting lineage. Is file saving
> asynchronous and Spark checkpointing does not care about it?
>
> I actually need to checkpoint both streams atomically at the end of the
> batch. It seems to me that Spark checkpoiting facility is quite unusable in
> practice except for some simple scenarios and everybody has to actually
> roll its own.
>
> Am I wrong? How can I use Spark checkpoiting to checkpoint both streams
> after they successfully save its results to files. It is actually the
> reason while I think that micro-batch streaming is nice because it has
> clearly defined synchronization barrier. But it doesn't seems like
> checkpointing takes an advantage of it.
>
> I can't ensure atomicity when saving more files for more streams and it
> would require some further cleanup code on job restart. But at least I
> would like to have a quarantee where existence of checkpoint file signals
> that batch with that timestamp finished successfully with all its RDD
> actions.
>
> Or it is expected to behave like this and I have something wrong with my
> code?
>
> Many thanks for any insights,
> Petr
>
>


Re: How to control spark.sql.shuffle.partitions per query

2015-09-23 Thread Ted Yu
Please take a look at the following for example:
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Search for spark.sql.shuffle.partitions and SQLConf.SHUFFLE_PARTITIONS.key

FYI

On Wed, Sep 23, 2015 at 12:42 AM, tridib  wrote:

> I am having GC issue with default value of spark.sql.shuffle.partitions
> (200). When I increase it to 2000, shuffle join works fine.
>
> I want to use different values for spark.sql.shuffle.partitions depending
> on
> data volume, for different queries which are fired from sane SparkSql
> context.
>
> Thanks
> Tridib
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-spark-sql-shuffle-partitions-per-query-tp24781.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Calling a method parallel

2015-09-23 Thread Tapan Sharma
Hi All,

I want to call a method X(int i) from my Spark program for different values
of i.
This means.
X(1), X(2).. X(n)..
Each time it returns the one object.
Currently I am doing this sequentially.

Is there any way to run these in parallel and I get back the list of
objects?
Sorry for this basic question.

Regards
Tapan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-a-method-parallel-tp24786.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



K Means Explanation

2015-09-23 Thread Tapan Sharma
Hi All,

In the KMeans example provided under mllib, it traverse the outcome of
KMeansModel to know the cluster centers like this:

KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs,
KMeans.K_MEANS_PARALLEL());

System.out.println("Cluster centers:");
for (Vector center : model.clusterCenters()) {
  System.out.println(" " + center);
}
https://spark.apache.org/docs/1.3.0/mllib-clustering.html#k-means
  
*How can I know the points contained in the particular cluster?*

Regards
Tapan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/K-Means-Explanation-tp24787.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to turn off Jetty Http stack errors on Spark web

2015-09-23 Thread Ted Yu
Have you read this ?
http://stackoverflow.com/questions/2246074/how-do-i-hide-stack-traces-in-the-browser-using-jetty

On Wed, Sep 23, 2015 at 6:56 AM, Rafal Grzymkowski  wrote:

> Hi,
>
> Is it possible to disable Jetty stack trace with errors on Spark
> master:8080 ?
> When I trigger Http server error 500 than anyone can read details.
> I tried options available in log4j.properties but it doesn't help.
> Any hint?
>
> Thank you for answer
> MyCo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to get RDD from PairRDD<key,value> in Java

2015-09-23 Thread Ankur Srivastava
PairRdd.values is what you need.

Ankur

On Tue, Sep 22, 2015, 11:25 PM Zhang, Jingyu 
wrote:

> Hi All,
>
> I want to extract the "value" RDD from PairRDD in Java
>
> Please let me know how can  I get it easily.
>
> Thanks
>
> Jingyu
>
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.


Re: SparkContext declared as object variable

2015-09-23 Thread Akhil Das
Yes of course it works.

[image: Inline image 1]

Thanks
Best Regards

On Tue, Sep 22, 2015 at 4:53 PM, Priya Ch 
wrote:

> Parallelzing some collection (array of strings). Infact in our product we
> are reading data from kafka using KafkaUtils.createStream and applying some
> transformations.
>
> Is creating sparContext at object level instead of creating in main
> doesn't work 
>
> On Tue, Sep 22, 2015 at 2:59 PM, Akhil Das 
> wrote:
>
>> Its a "value" not a variable, and what are you parallelizing here?
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Sep 18, 2015 at 11:21 PM, Priya Ch 
>> wrote:
>>
>>> Hello All,
>>>
>>>   Instead of declaring sparkContext in main, declared as object variable
>>> as -
>>>
>>>  object sparkDemo
>>> {
>>>
>>>  val conf = new SparkConf
>>>  val sc = new SparkContext(conf)
>>>
>>>   def main(args:Array[String])
>>>   {
>>>
>>> val baseRdd = sc.parallelize()
>>>.
>>>.
>>>.
>>>   }
>>>
>>> }
>>>
>>> But this piece of code is giving :
>>> java.io.IOException: org.apache.spark.SparkException: Failed to get
>>> broadcast_5_piece0 of broadcast_5
>>> at
>>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.spark.SparkException: Failed to get
>>> broadcast_5_piece0 of broadcast_5
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at org.apache.spark.broadcast.TorrentBroadcast.org
>>> 
>>> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
>>> at
>>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
>>>
>>> Why should't we declare sc as object variable ???
>>>
>>> Regards,
>>> Padma Ch
>>>
>>
>>
>


Re: unsubscribe

2015-09-23 Thread Richard Hillegas

Hi Ntale,

To unsubscribe from the user list, please send a message to
user-unsubscr...@spark.apache.org as described here:
http://spark.apache.org/community.html#mailing-lists.

Thanks,
-Rick

Ntale Lukama  wrote on 09/23/2015 04:34:48 AM:

> From: Ntale Lukama 
> To: user 
> Date: 09/23/2015 04:35 AM
> Subject: unsubscribe

Re: WAL on S3

2015-09-23 Thread Steve Loughran

On 23 Sep 2015, at 14:56, Michal Čizmazia 
> wrote:

To get around the fact that flush does not work in S3, my custom WAL 
implementation stores a separate S3 object per each WriteAheadLog.write call.

Do you see any gotchas with this approach?



nothing obvious.

the blob is PUT in the close() call; once that operation has completed then its 
in S3. For any attempt to open that file to read will immediately succeed, now 
even in US-east if you set the right endpoint:

https://forums.aws.amazon.com/ann.jspa?annID=3112
http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#Regions

If you can avoid listing operations or overwrites, you avoid the fun there.

You do have to bear in mind that the duration of stream.close() is now O(bytes) 
and may fail -a lot of code assumes it is instant and always works...


Re: Calling a method parallel

2015-09-23 Thread Robineast
The following should give you what you need:

val results = sc.makeRDD(1 to n).map(X(_)).collect

This should return the results as an array. 

_
Robin East
Spark GraphX in Action - Michael Malak and Robin East
Manning Publications
http://manning.com/books/spark-graphx-in-action



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-a-method-parallel-tp24786p24790.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to turn off Jetty Http stack errors on Spark web

2015-09-23 Thread Rafal Grzymkowski
Yes, I've seen it, but there are no files web.xml and error.jsp in binary 
installation of Spark.
To apply this solution I should probably take Spark sources than create missing 
files and than recompile Spark. Right?
I am looking for a solution to turn off error details without recompilation.
/MyCo

Re: KafkaProducer using Cassandra as source

2015-09-23 Thread kali.tumm...@gmail.com
Guys sorry I figured it out.

val
x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")

Full Code:-

package com.examples

/**
 * Created by kalit_000 on 22/09/2015.
 */

import kafka.producer.KeyedMessage
import kafka.producer.Producer
import kafka.producer.ProducerConfig
import java.util.Properties
import _root_.kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object SparkProducerDBCassandra {

  case class TestTable (TRADE_ID:String,TRADE_PRICE: String)

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host",
"127.0.0.1")
val sc=new SparkContext("local","test",conf)
//val ssc= new StreamingContext(sc,Seconds(2))

print("Test kali Spark Cassandra")

val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc)

val p=cc.sql("select * from people.person")

p.collect().foreach(println)

val props:Properties = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")

val config= new ProducerConfig(props)
val producer= new Producer[String,String](config)

val
x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")

   producer.send(new KeyedMessage[String, String]("trade", x))

//p.collect().foreach(print)

//ssc.start()

//ssc.awaitTermination()

  }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774p24788.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



create table in hive from spark-sql

2015-09-23 Thread Mohit Singh
Probably a noob question.
But I am trying to create a hive table using spark-sql.
Here is what I am trying to do:

hc = HiveContext(sc)

hdf = hc.parquetFile(output_path)

data_types = hdf.dtypes

schema = "(" + " ,".join(map(lambda x: x[0] + " " + x[1], data_types)) +")"

hc.sql(" CREATE TABLE IF NOT EXISTS example.foo " + schema)

There is already a database called "example" in hive.
But I see an error:

 An error occurred while calling o35.sql.

: org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution
Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask.
MetaException(message:java.lang.IllegalArgumentException:
java.net.URISyntaxException: Relative path in absolute URI:
hdfs://some_path/foo)

Also, I was wondering on how to use saveAsTable(..) construct
hdf.saveAsTable(tablename) tries to store into default db? How do I specify
the database name (example in this case) while trying to store this table?
Thanks
-- 
Mohit

"When you want success as badly as you want the air, then you will get it.
There is no other secret of success."
-Socrates


Re: How to turn on basic authentication for the Spark Web

2015-09-23 Thread Deenar Toraskar
Rafal
Check this out https://spark.apache.org/docs/latest/security.html

Regards
Deenar

On 23 September 2015 at 19:13, Rafal Grzymkowski  wrote:

> Hi,
>
> I want to enable basic Http authentication for the spark web UI (without
> recompilation need for Spark).
> I see there is 'spark.ui.filters' option but don't know how to use it.
> I found possibility to use kerberos param but it's not an option for me.
> What should I set there to use secret token based auth or user/passwd?
>
> Any hints?
>
> /MyCo
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Cosine LSH Join

2015-09-23 Thread Nick Pentreath
Not sure of performance but DIMSUM only handles "column similarity" and scales 
to maybe 100k columns.




Item-item similarity e.g. in MF models often requires millions of items (would 
be millions of columns in DIMSUM). So one needs an LSH type approach or brute 
force via Cartesian product (but can be blocked for efficiency see 
MatrixFactorizationModel  "recommendProductsForUser" method for example).



—
Sent from Mailbox

On Wed, Sep 23, 2015 at 8:35 PM, Charlie Hack 
wrote:

> This is great! Pretty sure I have a use for it involving entity resolution of 
> text records. 
> ​
> ​How does this compare to the DIMSUM similarity join implementation in MLlib 
> performance wise, out of curiosity?
> ​
> ​Thanks,
> ​
> ​Charlie 
> On Wednesday, Sep 23, 2015 at 09:25, Nick Pentreath 
> , wrote:
> Looks interesting - I've been trying out a few of the ANN / LSH packages on 
> spark-packages.org and elsewhere
> e.g. http://spark-packages.org/package/tdebatty/spark-knn-graphs and 
> https://github.com/marufaytekin/lsh-spark
> How does this compare? Perhaps you could put it up on spark-packages to get 
> visibility?
> On Wed, Sep 23, 2015 at 3:02 PM, Demir  wrote:
> We've just open sourced a LSH implementation on Spark. We're using this
> internally in order to find topK neighbors after a matrix factorization.
> We hope that this might be of use for others:
> https://github.com/soundcloud/cosine-lsh-join-spark
> For those wondering: lsh is a technique to quickly find most similar
> neighbors in a high dimensional space. This is a problem faced whenever
> objects are represented as vectors in a high dimensional space e.g. words,
> items, users...
> cheers
> özgür demir
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Cosine-LSH-Join-tp24785.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

Re: JdbcRDD Constructor

2015-09-23 Thread Deenar Toraskar
Satish

Can you post the SQL query you are using?

The SQL query must have 2 placeholders and both of them should be an
inclusive range (<= and >=)..

e.g. select title, author from books where ? <= id and id <= ?

Are you doing this?

Deenar

On 23 September 2015 at 20:18, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Satish
>
> Can you post the SQL query you are using?
>
> The SQL query must have 2 placeholders and both of them should be an
> inclusive range (<= and >=)..
>
> e.g. select title, author from books where ? <= id and id <= ?
>
> Are you doing this?
>
> Deenar
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 23 September 2015 at 13:47, satish chandra j 
> wrote:
>
>> HI,
>> Could anybody provide inputs if they have came across similar issue
>>
>> @Rishitesh
>> Could you provide if any sample code to use JdbcRDDSuite
>>
>>
>> Regards,
>> Satish Chandra
>>
>> On Wed, Sep 23, 2015 at 5:14 PM, Rishitesh Mishra <
>> rishi80.mis...@gmail.com> wrote:
>>
>>> I am using Spark 1.5. I always get count = 100, irrespective of num
>>> partitions.
>>>
>>> On Wed, Sep 23, 2015 at 5:00 PM, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
 HI,
 Currently using Spark 1.2.2, could you please let me know correct
 results output count which you got it by using JdbcRDDSuite

 Regards,
 Satish Chandra

 On Wed, Sep 23, 2015 at 4:02 PM, Rishitesh Mishra <
 rishi80.mis...@gmail.com> wrote:

> Which version of Spark you are using ??  I can get correct results
> using JdbcRDD. Infact there is a test suite precisely for this (
> JdbcRDDSuite) .
> I changed according to your input and got correct results from this
> test suite.
>
> On Wed, Sep 23, 2015 at 11:00 AM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI All,
>>
>> JdbcRDD constructor has following parameters,
>>
>> *JdbcRDD
>> *
>> (SparkContext
>> 
>>  sc,
>> scala.Function0 getConnection, String sql, *long 
>> lowerBound,
>> long upperBound, int numPartitions*,
>> scala.Function1> >
>>  mapRow,
>> scala.reflect.ClassTag> 
>> > evidence$1)
>>
>> where the below parameters *lowerBound* refers to Lower boundary of
>> entire data, *upperBound *refers to Upper boundary of entire data
>> and *numPartitions *refer to Number of partitions
>>
>> Source table to which JbdcRDD is fetching data from Oracle DB has
>> more than 500 records but its confusing when I tried several executions 
>> by
>> changing "numPartitions" parameter
>>
>> LowerBound,UpperBound,numPartitions: Output Count
>>
>> 0 ,100  ,1   : 100
>>
>> 0 ,100  ,2   : 151
>>
>> 0 ,100  ,3   : 201
>>
>>
>> Please help me in understanding the why Output count is 151 if
>> numPartitions is 2 and Output count is 201 if numPartitions is 3
>>
>> Regards,
>>
>> Satish Chandra
>>
>
>

>>>
>>
>


Re: WAL on S3

2015-09-23 Thread Michal Čizmazia
Thanks Steve!

FYI: S3 now supports GET-after-PUT consistency for new objects in all
regions, including US Standard



https://aws.amazon.com/about-aws/whats-new/2015/08/amazon-s3-introduces-new-usability-enhancements/





On 23 September 2015 at 13:12, Steve Loughran 
wrote:

>
> On 23 Sep 2015, at 14:56, Michal Čizmazia  wrote:
>
> To get around the fact that flush does not work in S3, my custom WAL
> implementation stores a separate S3 object per each WriteAheadLog.write
> call.
>
> Do you see any gotchas with this approach?
>
>
>
> nothing obvious.
>
> the blob is PUT in the close() call; once that operation has completed
> then its in S3. For any attempt to open that file to read will immediately
> succeed, now even in US-east if you set the right endpoint:
>
> https://forums.aws.amazon.com/ann.jspa?annID=3112
> http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#Regions
>
> If you can avoid listing operations or overwrites, you avoid the fun there.
>
> You do have to bear in mind that the duration of stream.close() is now
> O(bytes) and may fail -a lot of code assumes it is instant and always
> works...
>


Re: Cosine LSH Join

2015-09-23 Thread Charlie Hack
This is great! Pretty sure I have a use for it involving entity resolution of 
text records. 





​

​How does this compare to the DIMSUM similarity join implementation in MLlib 
performance wise, out of curiosity?

​

​Thanks,

​

​Charlie 














On Wednesday, Sep 23, 2015 at 09:25, Nick Pentreath , 
wrote:

Looks interesting - I've been trying out a few of the ANN / LSH packages on 
spark-packages.org and elsewhere


e.g. http://spark-packages.org/package/tdebatty/spark-knn-graphs and 
https://github.com/marufaytekin/lsh-spark





How does this compare? Perhaps you could put it up on spark-packages to get 
visibility?














On Wed, Sep 23, 2015 at 3:02 PM, Demir  wrote:
We've just open sourced a LSH implementation on Spark. We're using this

internally in order to find topK neighbors after a matrix factorization.


We hope that this might be of use for others:

https://github.com/soundcloud/cosine-lsh-join-spark


For those wondering: lsh is a technique to quickly find most similar

neighbors in a high dimensional space. This is a problem faced whenever

objects are represented as vectors in a high dimensional space e.g. words,

items, users...


cheers


özgür demir




--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cosine-LSH-Join-tp24785.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.


-

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org

Provide sampling ratio while loading json in spark version > 1.4.0

2015-09-23 Thread Udit Mehta
Hi,

In earlier versions of spark(< 1.4.0), we were able to specify the sampling
ratio while using *sqlContext.JsonFile* or *sqlContext.JsonRDD* so that we
dont inspect each and every element while inferring the schema.
I see that the use of these methods is deprecated in the newer spark
version and the suggested way is to use *read().json()* to load a json file
and return a dataframe. Is there a way to specify the sampling ratio using
these methods? Or am I doing something incorrect?

Thanks,
Udit


How to turn on basic authentication for the Spark Web

2015-09-23 Thread Rafal Grzymkowski
Hi,

I want to enable basic Http authentication for the spark web UI (without 
recompilation need for Spark).
I see there is 'spark.ui.filters' option but don't know how to use it.
I found possibility to use kerberos param but it's not an option for me.
What should I set there to use secret token based auth or user/passwd?

Any hints?

/MyCo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dose spark auto invoke StreamingContext.stop while receive kill signal?

2015-09-23 Thread Tathagata Das
YEs, since 1.4.0, it shuts down streamingContext without gracefully from
shutdown hook.
You can make it shutdown gracefully in that hook by setting the SparkConf
"spark.streaming.stopGracefullyOnShutdown" to "true"

Note to self, document this in the programming guide.

On Wed, Sep 23, 2015 at 3:33 AM, Bin Wang  wrote:

> I'd like the spark application to be stopped gracefully while received
> kill signal, so I add these code:
>
> sys.ShutdownHookThread {
>   println("Gracefully stopping Spark Streaming Application")
>   ssc.stop(stopSparkContext = true, stopGracefully = true)
>   println("Application stopped")
> }
>
> But the application is not stopped gracefully:
>
> 15/09/23 17:44:38 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
> ...
> 15/09/23 17:44:38 INFO streaming.StreamingContext: Invoking
> stop(stopGracefully=false) from shutdown hook
>
> Dose spark auto invoke StreamingContext.stop for me?
>


Re: Why RDDs are being dropped by Executors?

2015-09-23 Thread Tathagata Das
There could multiple reasons for caching till 90% -
1. not enough aggregate space in cluster - increase cluster memory
2. ata is skewed among executor so one executor is try to cache too much
while others are idle - Repartition the data using RDD.repartition to force
even distribution.

The Storage tab in UI should give you more idea between 1 and 2.

TD



On Wed, Sep 23, 2015 at 6:36 AM, Uthayan Suthakar <
uthayan.sutha...@gmail.com> wrote:

> Thank you tathagata for your response. It make sense to use the 
> MEMORY_AND_DISK.
> But sometime when I start the job it does not cache everyting at the start.
> It only caches 90%. The LRU scheme will only take affect after a while when
> the data is not in use but why it failing to cache the data at the beging
> of the job? This issue only occurs occasinally. For your information the
> Spark version that I'm using is 1.3.0, is there a known issue with this
> version?
>
>
>
> On 23 September 2015 at 07:13, Tathagata Das  wrote:
>
>> If the RDD is not constantly in use, then the LRU scheme in each executor
>> can kick out some of the partitions from memory.
>> If you want to avoid recomputing in such cases, you could persist with
>> StorageLevel.MEMORY_AND_DISK, where the partitions will dropped to disk
>> when kicked from memory. That will avoid recomputing, as the partitions
>> will be read from disk -- better than recomputing in most cases.
>>
>> On Tue, Sep 22, 2015 at 4:20 AM, Uthayan Suthakar <
>> uthayan.sutha...@gmail.com> wrote:
>>
>>>
>>> Hello All,
>>>
>>> We have a Spark Streaming job that reads data from DB (three tables) and
>>> cache them into memory ONLY at the start then it will happily carry out the
>>> incremental calculation with the new data. What we've noticed occasionally
>>> is that one of the RDDs caches only 90% of the data. Therefore, at each
>>> execution time the remaining 10% had to be recomputed. This operation is
>>> very expensive as it will need to establish a connection to DB and
>>> transform the data.We are allocating more than enough memories for each
>>> executor ( 4 Executors and each has 2GB memory). Have you come across this
>>> issue? Do you know what may cause this issue?
>>>
>>> Another observation:
>>> I started the job yesterday it cached 100% of the RDD but looking at it
>>> today it show 90%. what happened to that 10% of data?
>>>
>>
>>
>


Re: How to turn on basic authentication for the Spark Web

2015-09-23 Thread Rafal Grzymkowski
I know this Spark Security page, but the information there is not sufficient.
Anyone make it works? Those basic servlets for ui.filters

Re: How to turn on basic authentication for the Spark Web

2015-09-23 Thread Deenar Toraskar
Check this out
http://lambda.fortytools.com/post/26977061125/servlet-filter-for-http-basic-auth
or https://gist.github.com/neolitec/8953607 for examples of filters
implementing basic authentication. Implement one of these and set them in
the spark.ui.filters property.

Deenar

On 23 September 2015 at 20:44, Rafal Grzymkowski  wrote:

> I know this Spark Security page, but the information there is not
> sufficient.
> Anyone make it works? Those basic servlets for ui.filters


Re: Why RDDs are being dropped by Executors?

2015-09-23 Thread Tathagata Das
If the RDD is not constantly in use, then the LRU scheme in each executor
can kick out some of the partitions from memory.
If you want to avoid recomputing in such cases, you could persist with
StorageLevel.MEMORY_AND_DISK, where the partitions will dropped to disk
when kicked from memory. That will avoid recomputing, as the partitions
will be read from disk -- better than recomputing in most cases.

On Tue, Sep 22, 2015 at 4:20 AM, Uthayan Suthakar <
uthayan.sutha...@gmail.com> wrote:

>
> Hello All,
>
> We have a Spark Streaming job that reads data from DB (three tables) and
> cache them into memory ONLY at the start then it will happily carry out the
> incremental calculation with the new data. What we've noticed occasionally
> is that one of the RDDs caches only 90% of the data. Therefore, at each
> execution time the remaining 10% had to be recomputed. This operation is
> very expensive as it will need to establish a connection to DB and
> transform the data.We are allocating more than enough memories for each
> executor ( 4 Executors and each has 2GB memory). Have you come across this
> issue? Do you know what may cause this issue?
>
> Another observation:
> I started the job yesterday it cached 100% of the RDD but looking at it
> today it show 90%. what happened to that 10% of data?
>


Re: How to get RDD from PairRDD<key,value> in Java

2015-09-23 Thread Andy Huang
use .values() which will return an RDD of just values

On Wed, Sep 23, 2015 at 4:24 PM, Zhang, Jingyu 
wrote:

> Hi All,
>
> I want to extract the "value" RDD from PairRDD in Java
>
> Please let me know how can  I get it easily.
>
> Thanks
>
> Jingyu
>
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.




-- 
Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979


Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-23 Thread Petr Novak
You can implement your own case class supporting more then 22 fields. It is
something like:

class MyRecord(val val1: String, val val2: String, ... more then 22,
in this case f.e. 26)
  extends Product with Serializable {

  def canEqual(that: Any): Boolean = that.isInstanceOf[MyRecord]

  def productArity: Int = 26 // example value, it is amount of arguments

  def productElement(n: Int): Serializable = n match {
case  1 => val1
case  2 => val2
//... cases up to 26
  }
}

You can google it for more details.

Petr


Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-23 Thread Petr Novak
If you need to understand what is the magic Product then google up
Algebraic Data Types and learn it together with what is Sum type. One
option is http://www.stephanboyer.com/post/18/algebraic-data-types

Enjoy,
Petr

On Wed, Sep 23, 2015 at 9:07 AM, Petr Novak  wrote:

> I'm unsure if it completely equivalent to a case class and if it has some
> limitations compared to case class or if it needs some more methods
> implemented.
>
> Petr
>
> On Wed, Sep 23, 2015 at 9:04 AM, Petr Novak  wrote:
>
>> You can implement your own case class supporting more then 22 fields. It
>> is something like:
>>
>> class MyRecord(val val1: String, val val2: String, ... more then 22, in this 
>> case f.e. 26)
>>   extends Product with Serializable {
>>
>>   def canEqual(that: Any): Boolean = that.isInstanceOf[MyRecord]
>>
>>   def productArity: Int = 26 // example value, it is amount of arguments
>>
>>   def productElement(n: Int): Serializable = n match {
>> case  1 => val1
>> case  2 => val2
>> //... cases up to 26
>>   }
>> }
>>
>> You can google it for more details.
>>
>> Petr
>>
>
>


Is it possible to merged delayed batches in streaming?

2015-09-23 Thread Bin Wang
I'm using Spark Streaming and there maybe some delays between batches. I'd
like to know is it possible to merge delayed batches into one batch to do
processing?

For example, the interval is set to 5 min but the first batch uses 1 hour,
so there are many batches delayed. In the end of processing for each batch,
I'll save the data into database. So if all the delayed batches are merged
into a big one, it will save many resources. I'd like to know if it is
possible. Thanks.


How to subtract two RDDs with same size

2015-09-23 Thread Zhiliang Zhu
Hi All,
There are two RDDs :  RDD rdd1, and RDD rdd2,that 
is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with same row 
number and column number.
I would like to get RDD rdd3,  each element in rdd3 is the 
subtract between rdd1 and rdd2 of thesame position, which is similar Matrix 
subtract:rdd3 = rdd1 - rdd2 ...
It seemed very difficult to operate this kinds of matrix  arithmetic, even is 
about add, subtract, multiple , diff etc...
I shall  appreciate your help very much~~Zhiliang



Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-09-23 Thread tridib
Setting spark.sql.shuffle.partitions = 2000 solved my issue. I am able to
join 2 1 billion rows tables in 3 minutes.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750p24782.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: WAL on S3

2015-09-23 Thread Tathagata Das
Responses inline.


On Tue, Sep 22, 2015 at 8:35 PM, Michal Čizmazia  wrote:

> Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?
>
> Yes. Because checkpoints are single files by itself, and does not require
flush semantics to work. So S3 is fine.



> Trying to answer this question, I looked into
> Checkpoint.getCheckpointFiles [1]. It is doing findFirstIn which would
> probably be calling the S3 LIST operation. S3 LIST is prone to eventual
> consistency [2]. What would happen when getCheckpointFiles retrieves an
> incomplete list of files to Checkpoint.read [1]?
>
> There is a non-zero chance of that happening. But in that case it will
just use an older checkpoint file to recover the DAG of DStreams. That just
means that it will recover at an earlier point in time, and undergo more
computation.



> The pluggable WAL interface allows me to work around the eventual
> consistency of S3 by storing an index of filenames in DynamoDB. However it
> seems that something similar is required for checkpoints as well.
>

How are you getting around the fact that flush does not work in S3? So
until the current WAL file is closed, the file is not readable, even if you
know the index.
This should not need for checkpoint because of the reason I mentioned above
in this mail.


>
> I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is
> there something I can borrow from DirectKafkaInputDStream? After a DStream
> computes an RDD, is there a way for the DStream to tell when processing of
> that RDD has been finished and only after that delete the SQS messages.
>

You could borrow from Direct Kafka! For that to work, you should be able to
do the following.
1. Each message in SQS should have a unique identifier, using which you can
specify ranges of messages to read.

2.  You should be able to query from SQS the identifier of the latest
message, so that you can decide the range to read -- last read message to
latest message

3. There must be a way to find out identifier of the Nth record from the
current record. This is necessary for rate limiting -- if you want to read
at most 1000 message in each interval, and you have read till ID X, then
you should be able to find out (without reading the data), the ID of (X +
1000)th record. This is possible in Kafka, as offsets are continuous, but
not possible in Kinesis as sequence numbers are not continuous numbers.

I am not sure SQS satisfies these properties. If it satisfies 1 and 2, but
not 3, then you can consider looking at the Kinesis Receiver in Spark 1.5,
which still uses receivers, but keeps track of Kinesis sequence numbers in
the metadata WAL.


>
> I was also considering Amazon EFS, but it is only available in a single
> region for a preview. EBS could be an option, but it cannot be used across
> multiple Availability Zones.
>
> [1]:
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
> [2]:
> http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
>
>
>
> On 22 September 2015 at 21:09, Tathagata Das  wrote:
>
>> You can keep the checkpoints in the Hadoop-compatible file system and the
>> WAL somewhere else using your custom WAL implementation. Yes, cleaning up
>> the stuff gets complicated as it is not as easy as deleting off the
>> checkpoint directory - you will have to clean up checkpoint directory as
>> well as the whatever other storage that your custom WAL uses. However, if I
>> remember correctly, the WAL information is used only when the Dstreams are
>> recovered correctly from checkpoints.
>>
>> Note that, there are further details here that require deeper
>> understanding. There are actually two uses of WALs in the system -
>>
>> 1. Data WAL for received data  - This is what is usually referred to as
>> the WAL everywhere. Each receiver writes to a different WAL. This deals
>> with bulk data.
>> 2. Metadata WAL - This is used by the driver to save metadata information
>> like  block to data WAL segment mapping, etc. I usually skip mentioning
>> this. This WAL is automatically used when data WAL is enabled. And this
>> deals with small data.
>>
>> If you have to get around S3's limitations, you will have to plugin both
>> WALs (see this
>> 
>> for SparkConfs, but not that we havent made these confs public). While the
>> system supports plugging them in, we havent made this information public
>> yet because of such complexities in working with it.  And we have invested
>> time in making common sources like Kafka not require WALs (e.g. Direct
>> Kafka  approach). In future, we do hope to have a better solution for
>> general receivers + WALs + S3 (personally, I really wish S3's semantics
>> improve and fixes this issue).
>>
>> Another alternative direction may be Amazon EFS. Since it 

How to get RDD from PairRDD<key,value> in Java

2015-09-23 Thread Zhang, Jingyu
Hi All,

I want to extract the "value" RDD from PairRDD in Java

Please let me know how can  I get it easily.

Thanks

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: Spark as standalone or with Hadoop stack.

2015-09-23 Thread Sean Owen
Might be better for another list, but, I suspect it's more than HBase
is simply much more integrated with YARN, and because it's run with
other services that are as well.

On Wed, Sep 23, 2015 at 12:02 AM, Jacek Laskowski  wrote:
> That sentence caught my attention. Could you explain the reasons for
> not running HBase on Mesos, i.e. what makes Mesos inappropriate for
> HBase?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Yarn Shutting Down Spark Processing

2015-09-23 Thread Tathagata Das
Does your simple Spark batch jobs work in the same YARN setup? May be YARN
is not able to provide resources that you are asking for.

On Tue, Sep 22, 2015 at 5:49 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I have a Spark streaming job running on a cluster managed by Yarn.  The
> spark streaming job starts and receives data from Kafka.  It is processing
> well and then after several seconds I see the following error:
>
> 15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not
> initialize after waiting for 10 ms. Please check earlier log output for
> errors. Failing the application.
> 15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 13, (reason: Timed out waiting for SparkContext.)
>
> The spark process is then (obviously) shut down by Yarn.
>
> What do I need to change to allow Yarn to initialize Spark streaming (vs.
> batch) jobs?
>
> Thank you,
>
> Bryan Jeffrey
>


Topic Modelling- LDA

2015-09-23 Thread Subshiri S
Hi,
I am experimenting with Spark LDA.
How do I create Topic Model for Prediction in Spark ?
How do I evaluate the topics modelled in Spark ?

Could you point some examples.

Regards,
Subshiri


Re: Topic Modelling- LDA

2015-09-23 Thread Sameer Farooqui
Hi Subshri,

You may find these 2 blog posts useful:

https://databricks.com/blog/2015/03/25/topic-modeling-with-lda-mllib-meets-graphx.html

https://databricks.com/blog/2015/09/22/large-scale-topic-modeling-improvements-to-lda-on-spark.html

On Tue, Sep 22, 2015 at 11:54 PM, Subshiri S  wrote:

> Hi,
> I am experimenting with Spark LDA.
> How do I create Topic Model for Prediction in Spark ?
> How do I evaluate the topics modelled in Spark ?
>
> Could you point some examples.
>
> Regards,
> Subshiri
>
>


Re: How to subtract two RDDs with same size

2015-09-23 Thread Zhiliang Zhu
there is matrix add API, might map rdd2 each row element to be negative , then 
make rdd1 and rdd2 and call add ? 

Or some more ways ...  


 On Wednesday, September 23, 2015 3:11 PM, Zhiliang Zhu 
 wrote:
   

 Hi All,
There are two RDDs :  RDD rdd1, and RDD rdd2,that 
is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with same row 
number and column number.
I would like to get RDD rdd3,  each element in rdd3 is the 
subtract between rdd1 and rdd2 of thesame position, which is similar Matrix 
subtract:rdd3 = rdd1 - rdd2 ...
It seemed very difficult to operate this kinds of matrix  arithmetic, even is 
about add, subtract, multiple , diff etc...
I shall  appreciate your help very much~~Zhiliang



  

Re: Streaming Receiver Imbalance Problem

2015-09-23 Thread SLiZn Liu
The imbalance was caused by the stuck partition, after 10s of hours the
receiving rate went down. But the second ERR log I mentioned in the first
mail now occur at most of tasks(I did’t count, but keep flushing my
terminal) and jeopardize the job, as every batch takes 2 min(2-15 seconds
before) execution time and delays as much as 2+ hours.

The daunting ERROR log:

5/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414899 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414896 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414898 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414893 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414900 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414895 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414897 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414894 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
[Stage 4:> (0 + 3)
/ 3][Stage 29809:> (0 +
176) / 180]15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update
with state FINISHED for TID 1414899 because its task set is gone (this
is likely the result of receiving duplicate task finished status
updates)
15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414896 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414898 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414895 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414893 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414900 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414897 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)
15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state
FINISHED for TID 1414894 because its task set is gone (this is likely
the result of receiving duplicate task finished status updates)

the source code where the errors were thrown: statusUpdate(tid: Long,
state: TaskState, serializedData: ByteBuffer)
,
any suggestions that where should I dig in?

BR,
Todd Leo

On Wed, Sep 23, 2015 at 1:53 PM Tathagata Das t...@databricks.com
 wrote:

Also, you could switch to the Direct KAfka API which was first released as
> experimental in 1.3. In 1.5 we graduated it from experimental, but its
> quite usable in Spark 1.3.1
>
> TD
>
> On Tue, Sep 22, 2015 at 7:45 PM, SLiZn Liu  wrote:
>
>> Cool, we are still sticking with 1.3.1, will upgrade to 1.5 ASAP. Thanks
>> for the tips, Tathagata!
>>
>> On Wed, Sep 23, 2015 at 10:40 AM Tathagata Das 
>> wrote:
>>
>>> A lot of these imbalances were solved in spark 1.5. Could you give that
>>> a spin?
>>>
>>> https://issues.apache.org/jira/browse/SPARK-8882
>>>
>>> On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu 
>>> wrote:
>>>
 Hi spark users,

 In our Spark Streaming app via Kafka integration on Mesos, we initialed
 3 

Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-23 Thread satish chandra j
HI Andy,
So I believe if I opt pro grammatically building the schema approach, than
it would not have have any restriction as such in "case Class not allowing
more than 22 Arguments"

As I need to define a schema of around 37 arguments

Regards,
Satish Chandra

On Wed, Sep 23, 2015 at 9:50 AM, Andy Huang 
wrote:

> Alternatively, I would suggest you looking at programmatically building
> the schema
>
> refer to
> http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
>
> Cheers
> Andy
>
> On Wed, Sep 23, 2015 at 2:07 PM, Ted Yu  wrote:
>
>> Can you switch to 2.11 ?
>>
>> The following has been fixed in 2.11:
>> https://issues.scala-lang.org/browse/SI-7296
>>
>> Otherwise consider packaging related values into a case class of their
>> own.
>>
>> On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI All,
>>> Do we have any alternative solutions in Scala to avoid limitation in
>>> defining a Case Class having more than 22 arguments
>>>
>>> We are using Scala version 2.10.2, currently I need to define a case
>>> class with 37 arguments but getting an error as "*error: Implementation
>>> restriction: case classes cannot have more than 22 parameters.*"
>>>
>>> It would be a great help if any inputs on the same
>>>
>>> Regards,
>>> Satish Chandra
>>>
>>>
>>>
>>
>
>
> --
> Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
> f: 02 9376 0730| m: 0433221979
>


How to control spark.sql.shuffle.partitions per query

2015-09-23 Thread tridib
I am having GC issue with default value of spark.sql.shuffle.partitions
(200). When I increase it to 2000, shuffle join works fine.

I want to use different values for spark.sql.shuffle.partitions depending on
data volume, for different queries which are fired from sane SparkSql
context.

Thanks
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-spark-sql-shuffle-partitions-per-query-tp24781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-23 Thread Andy Huang
Yes and I would recommend it because it can  be made generic and reusable
too


On Wed, Sep 23, 2015 at 5:37 PM, satish chandra j 
wrote:

> HI Andy,
> So I believe if I opt pro grammatically building the schema approach, than
> it would not have have any restriction as such in "case Class not allowing
> more than 22 Arguments"
>
> As I need to define a schema of around 37 arguments
>
> Regards,
> Satish Chandra
>
> On Wed, Sep 23, 2015 at 9:50 AM, Andy Huang 
> wrote:
>
>> Alternatively, I would suggest you looking at programmatically building
>> the schema
>>
>> refer to
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
>>
>> Cheers
>> Andy
>>
>> On Wed, Sep 23, 2015 at 2:07 PM, Ted Yu  wrote:
>>
>>> Can you switch to 2.11 ?
>>>
>>> The following has been fixed in 2.11:
>>> https://issues.scala-lang.org/browse/SI-7296
>>>
>>> Otherwise consider packaging related values into a case class of their
>>> own.
>>>
>>> On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
 HI All,
 Do we have any alternative solutions in Scala to avoid limitation in
 defining a Case Class having more than 22 arguments

 We are using Scala version 2.10.2, currently I need to define a case
 class with 37 arguments but getting an error as "*error:
 Implementation restriction: case classes cannot have more than 22
 parameters.*"

 It would be a great help if any inputs on the same

 Regards,
 Satish Chandra



>>>
>>
>>
>> --
>> Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
>> f: 02 9376 0730| m: 0433221979
>>
>
>


-- 
Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979


Checkpoint files are saved before stream is saved to file (rdd.toDF().write ...)?

2015-09-23 Thread Petr Novak
Hi,
I have 2 streams and checkpointing with code based on documentation. One
stream is transforming data from Kafka and saves them to Parquet file. The
other stream uses the same stream and does updateStateByKey to compute some
aggregations. There is no gracefulShutdown.

Both use about this code to save files:

stream.foreachRDD { (rdd, time) =>
  ...
  rdd.toDF().write.save(...use time for the directory name...)
}

It is not idempotent at the moment but let's put this aside for now.

The strange thing is that when I Ctrl+C the job I can see checkpoint file
with timestamp for the last batch but there are no stream files/directories
for this timestamp or only one of streams have data saved with time aligned
with the last checkpoint file. I would expect that checkpoint file is
created after both streams successfully finishes its saves and that it is
created at the end of the batch. Otherwise I don't know for what
checkpointing is good for except maybe cutting lineage. Is file saving
asynchronous and Spark checkpointing does not care about it?

I actually need to checkpoint both streams atomically at the end of the
batch. It seems to me that Spark checkpoiting facility is quite unusable in
practice except for some simple scenarios and everybody has to actually
roll its own.

Am I wrong? How can I use Spark checkpoiting to checkpoint both streams
after they successfully save its results to files. It is actually the
reason while I think that micro-batch streaming is nice because it has
clearly defined synchronization barrier. But it doesn't seems like
checkpointing takes an advantage of it.

I can't ensure atomicity when saving more files for more streams and it
would require some further cleanup code on job restart. But at least I
would like to have a quarantee where existence of checkpoint file signals
that batch with that timestamp finished successfully with all its RDD
actions.

Or it is expected to behave like this and I have something wrong with my
code?

Many thanks for any insights,
Petr


Re: WAL on S3

2015-09-23 Thread Steve Loughran

On 23 Sep 2015, at 07:10, Tathagata Das 
> wrote:

Responses inline.


On Tue, Sep 22, 2015 at 8:35 PM, Michal Čizmazia 
> wrote:
Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?

Yes. Because checkpoints are single files by itself, and does not require flush 
semantics to work. So S3 is fine.


Trying to answer this question, I looked into Checkpoint.getCheckpointFiles 
[1]. It is doing findFirstIn which would probably be calling the S3 LIST 
operation. S3 LIST is prone to eventual consistency [2]. What would happen when 
getCheckpointFiles retrieves an incomplete list of files to Checkpoint.read [1]?

There is a non-zero chance of that happening. But in that case it will just use 
an older checkpoint file to recover the DAG of DStreams. That just means that 
it will recover at an earlier point in time, and undergo more computation.


Yes, it's the listings that are most prone to inconsistency. US-East is also 
the worst; the other sites all guarantee create consistency. (I think US-East 
does not for some endpoints)

BTW google and microsoft's object stores do offer consistency. OpenStack's 
swift is pretty bad.


The pluggable WAL interface allows me to work around the eventual consistency 
of S3 by storing an index of filenames in DynamoDB. However it seems that 
something similar is required for checkpoints as well.


Netflix's s3mper extension to s3 claims to offer a consistent view of the s3 
filesystem, moving the directory listings into dynamo, while leaving the data 
in s3.




Re: Is it possible to merged delayed batches in streaming?

2015-09-23 Thread Tathagata Das
Its not possible. And its actually fundamentally challenging to do so in
the general case because it becomes hard to reason about the processing
semantics - especially when there are per-batch aggregations.

On Wed, Sep 23, 2015 at 12:17 AM, Bin Wang  wrote:

> I'm using Spark Streaming and there maybe some delays between batches. I'd
> like to know is it possible to merge delayed batches into one batch to do
> processing?
>
> For example, the interval is set to 5 min but the first batch uses 1 hour,
> so there are many batches delayed. In the end of processing for each batch,
> I'll save the data into database. So if all the delayed batches are merged
> into a big one, it will save many resources. I'd like to know if it is
> possible. Thanks.
>


Re: spark + parquet + schema name and metadata

2015-09-23 Thread Borisa Zivkovic
Hi,

thanks a lot for this! I will try it out to see if this works ok.

I am planning to use "stable" metadata - so those will be same across all
parquet files inside directory hierarchy...



On Tue, 22 Sep 2015 at 18:54 Cheng Lian  wrote:

> Michael reminded me that although we don't support direct manipulation
> over Parquet metadata, you can still save/query metadata to/from Parquet
> via DataFrame per-column metadata. For example:
>
> import sqlContext.implicits._
> import org.apache.spark.sql.types.MetadataBuilder
>
> val path = "file:///tmp/parquet/meta"
>
> // Saving metadata
> val meta = new MetadataBuilder().putString("appVersion", "1.0.2").build()
> sqlContext.range(10).select($"id".as("id",
> meta)).coalesce(1).write.mode("overwrite").parquet(path)
>
> // Querying metadata
> sqlContext.read.parquet(path).schema("id").metadata.getString("appVersion")
>
> The metadata is saved together with Spark SQL schema as a JSON string. For
> example, the above code generates the following Parquet metadata (inspected
> with parquet-meta):
>
> file:
> file:/private/tmp/parquet/meta/part-r-0-77cb2237-e6a8-4cb6-a452-ae205ba7b660.gz.parquet
> creator: parquet-mr version 1.6.0
> extra:   org.apache.spark.sql.parquet.row.metadata =
> {"type":"struct","fields":[{"name":"id","type":"long","nullable":true,
> *"metadata":{"appVersion":"1.0.2"}*}]}
>
>
> Cheng
>
>
> On 9/22/15 9:37 AM, Cheng Lian wrote:
>
> I see, this makes sense. We should probably add this in Spark SQL.
>
> However, there's one corner case to note about user-defined Parquet
> metadata. When committing a write job, ParquetOutputCommitter writes
> Parquet summary files (_metadata and _common_metadata), and user-defined
> key-value metadata written in all Parquet part-files get merged here. The
> problem is that, if a single key is associated with multiple values,
> Parquet doesn't know how to reconcile this situation, and simply gives up
> writing summary files. This can be particular annoying for appending. In
> general, users should avoid storing "unstable" values like timestamps as
> Parquet metadata.
>
> Cheng
>
> On 9/22/15 1:58 AM, Borisa Zivkovic wrote:
>
> thanks for answer.
>
> I need this in order to be able to track schema metadata.
>
> basically when I create parquet files from Spark I want to be able to
> "tag" them in some way (giving the schema appropriate name or attaching
> some key/values) and then it is fairly easy to get basic metadata about
> parquet files when processing and discovering those later on.
>
> On Mon, 21 Sep 2015 at 18:17 Cheng Lian  wrote:
>
>> Currently Spark SQL doesn't support customizing schema name and
>> metadata. May I know why these two matters in your use case? Some
>> Parquet data models, like parquet-avro, do support it, while some others
>> don't (e.g. parquet-hive).
>>
>> Cheng
>>
>> On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
>> > Hi,
>> >
>> > I am trying to figure out how to write parquet metadata when
>> > persisting DataFrames to parquet using Spark (1.4.1)
>> >
>> > I could not find a way to change schema name (which seems to be
>> > hardcoded to root) and also how to add data to key/value metadata in
>> > parquet footer.
>> >
>> > org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
>> >
>> > org.apache.parquet.schema.Type#getName
>> >
>> > thanks
>> >
>> >
>>
>>
>
>


Custom Hadoop InputSplit, Spark partitions, spark executors/task and Yarn containers

2015-09-23 Thread Anfernee Xu
Hi Spark experts,

I'm coming across these terminologies and having some confusions, could you
please help me understand them better?

For instance I have implemented a Hadoop InputFormat to load my external
data in Spark, in turn my custom InputFormat will create a bunch of
InputSplit's, my questions is about

# Each InputSplit will exactly map to a Spark partition, is that correct?

# If I run on Yarn, how does Spark executor/task map to Yarn container?

# because I already have a bunch of InputSplits, do I still need to specify
the number of executors to get processing parallelized?

# How does -executor-memory map to the memory requirement in Yarn's
resource request?

-- 
--Anfernee


How to obtain the key in updateStateByKey

2015-09-23 Thread swetha
Hi,

How to obtain the current key in updateStateBykey ?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-obtain-the-key-in-updateStateByKey-tp24792.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



LogisticRegression models consumes all driver memory

2015-09-23 Thread Eugene Zhulenev
We are running Apache Spark 1.5.0 (latest code from 1.5 branch)

We are running 2-3 LogisticRegression models in parallel (we'd love to run
10-20 actually), they are not really big at all, maybe 1-2 million rows in
each model.

Cluster itself, and all executors look good. Enough free memory and no
exceptions or errors.

However I see very strange behavior inside Spark driver. Allocated heap
constantly growing. It grows up to 30 gigs in 1.5 hours and then everything
becomes super sloow.

We don't do any collect, and I really don't understand who is consuming all
this memory. Looks like it's something inside LogisticRegression itself,
however I only see treeAggregate which should not require so much memory to
run.

Any ideas?

Plus I don't see any GC pause, looks like memory is still used by someone
inside driver.

[image: Inline image 2]
[image: Inline image 1]


Re: Yarn Shutting Down Spark Processing

2015-09-23 Thread Tathagata Das
CC;ing Hari who may have a better sense of whats going on.

-- Forwarded message --
From: Bryan 
Date: Wed, Sep 23, 2015 at 3:43 AM
Subject: RE: Yarn Shutting Down Spark Processing
To: Tathagata Das 
Cc: user 


Tathagata,

Simple batch jobs do work. The cluster has a good set of resources and a
limited input volume on the given Kafka topic.

The job works on the small 3-node standalone-configured cluster I have
setup for test.

Regards,

Bryan Jeffrey
--
From: Tathagata Das 
Sent: ‎9/‎23/‎2015 2:46 AM
To: Bryan Jeffrey 
Cc: user 
Subject: Re: Yarn Shutting Down Spark Processing

Does your simple Spark batch jobs work in the same YARN setup? May be YARN
is not able to provide resources that you are asking for.

On Tue, Sep 22, 2015 at 5:49 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I have a Spark streaming job running on a cluster managed by Yarn.  The
> spark streaming job starts and receives data from Kafka.  It is processing
> well and then after several seconds I see the following error:
>
> 15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not
> initialize after waiting for 10 ms. Please check earlier log output for
> errors. Failing the application.
> 15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 13, (reason: Timed out waiting for SparkContext.)
>
> The spark process is then (obviously) shut down by Yarn.
>
> What do I need to change to allow Yarn to initialize Spark streaming (vs.
> batch) jobs?
>
> Thank you,
>
> Bryan Jeffrey
>


Re: KafkaProducer using Cassandra as source

2015-09-23 Thread Todd Nist
Hi Kali,

If you do not mind sending JSON, you could do something like this, using
json4s:


val rows = p.collect() map ( row => TestTable(row.getString(0),
row.getString(1)) )

val json = parse(write(rows))

producer.send(new KeyedMessage[String, String]("trade", writePretty(json)))

// or for each individual entry
for( row <- rows) {
  producer.send(new KeyedMessage[String, String]("trade",
writePretty(parse(write(row)
}

Just make sure you import the following:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{ read, write, writePretty }


On Wed, Sep 23, 2015 at 12:26 PM, kali.tumm...@gmail.com <
kali.tumm...@gmail.com> wrote:

> Guys sorry I figured it out.
>
> val
>
> x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")
>
> Full Code:-
>
> package com.examples
>
> /**
>  * Created by kalit_000 on 22/09/2015.
>  */
>
> import kafka.producer.KeyedMessage
> import kafka.producer.Producer
> import kafka.producer.ProducerConfig
> import java.util.Properties
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark._
> import org.apache.spark.SparkContext._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.SparkConf
> import org.apache.log4j.Logger
> import org.apache.log4j.Level
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.{Seconds,StreamingContext}
> import org.apache.spark._
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> object SparkProducerDBCassandra {
>
>   case class TestTable (TRADE_ID:String,TRADE_PRICE: String)
>
>   def main(args: Array[String]): Unit =
>   {
> Logger.getLogger("org").setLevel(Level.WARN)
> Logger.getLogger("akka").setLevel(Level.WARN)
>
> val conf = new
>
> SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host",
> "127.0.0.1")
> val sc=new SparkContext("local","test",conf)
> //val ssc= new StreamingContext(sc,Seconds(2))
>
> print("Test kali Spark Cassandra")
>
> val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc)
>
> val p=cc.sql("select * from people.person")
>
> p.collect().foreach(println)
>
> val props:Properties = new Properties()
> props.put("metadata.broker.list", "localhost:9092")
> props.put("serializer.class", "kafka.serializer.StringEncoder")
>
> val config= new ProducerConfig(props)
> val producer= new Producer[String,String](config)
>
> val
>
> x=p.collect().mkString("\n").replace("[","").replace("]","").replace(",","~")
>
>producer.send(new KeyedMessage[String, String]("trade", x))
>
> //p.collect().foreach(print)
>
> //ssc.start()
>
> //ssc.awaitTermination()
>
>   }
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774p24788.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Join over many small files

2015-09-23 Thread Tracewski, Lukasz
Hi all,

I would like you to ask for an advise on how to efficiently make a join 
operation in Spark with tens of thousands of tiny files. A single file has a 
few KB and ~50 rows. In another scenario they might have 200 KB and 2000 rows.

To give you impression how they look like:

File 01
ID | VALUE
01 | 10
02 | 12
03 | 55
...

File 02
ID | VALUE
01 | 33
02 | 21
03 | 53
...

and so on... ID is unique in a file, but repeats in every file. There is also a 
Special file which has the same form:

File Special
ID | VALUE
01 | 21
02 | 23
03 | 54
...

What I would like to get is a join of File 01..1 with File Special to get a 
difference between values:

File Result 01 = File Special - File 01
ID | VALUE
01 | 21-10
02 | 23-12
03 | 54-53
...

And save result to a csv, meaning 1 new files. What's the best way of doing 
this?

My idea was the following:

1.   Read all Files with wholeTextFiles, each to a separate partition

2.   Perform map-side join with broadcast variable inside mapPartitions 
(the "Special" file will be broadcasted).

I am on Spark 1.3, but it can be upgraded if needed. Perhaps this could be done 
better in a dataframe? Then I would create one large dataframe, with additional 
"filename" key, i.e.:
File | ID | Value
01 | 01 | 10
01 | 02 | 12
01 | 03 | 55
02 | 01 | 21
02 | 02 | 23
...

What would be then a way to make an efficient query over such dataframe?

Any advice will be appreciated.

Best regards,
Lucas


=== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=== 


Re: Spark as standalone or with Hadoop stack.

2015-09-23 Thread Ted Yu
HDFS on Mesos framework is still being developed.
What I said previously reflected current deployment practice.

Things may change in the future.

On Tue, Sep 22, 2015 at 4:02 PM, Jacek Laskowski  wrote:

> On Tue, Sep 22, 2015 at 10:03 PM, Ted Yu  wrote:
>
> > To my knowledge, no one runs HBase on top of Mesos.
>
> Hi,
>
> That sentence caught my attention. Could you explain the reasons for
> not running HBase on Mesos, i.e. what makes Mesos inappropriate for
> HBase?
>
> Jacek
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Yarn Shutting Down Spark Processing

2015-09-23 Thread Bryan
Also - I double checked - we're setting the master to "yarn-cluster"

-Original Message-
From: "Tathagata Das" 
Sent: ‎9/‎23/‎2015 2:38 PM
To: "Bryan" 
Cc: "user" ; "Hari Shreedharan" 

Subject: Re: Yarn Shutting Down Spark Processing

CC;ing Hari who may have a better sense of whats going on.


-- Forwarded message --
From: Bryan 
Date: Wed, Sep 23, 2015 at 3:43 AM
Subject: RE: Yarn Shutting Down Spark Processing
To: Tathagata Das 
Cc: user 



Tathagata,

Simple batch jobs do work. The cluster has a good set of resources and a 
limited input volume on the given Kafka topic.

The job works on the small 3-node standalone-configured cluster I have setup 
for test.

Regards,

Bryan Jeffrey


From: Tathagata Das
Sent: ‎9/‎23/‎2015 2:46 AM
To: Bryan Jeffrey
Cc: user
Subject: Re: Yarn Shutting Down Spark Processing


Does your simple Spark batch jobs work in the same YARN setup? May be YARN is 
not able to provide resources that you are asking for. 


On Tue, Sep 22, 2015 at 5:49 PM, Bryan Jeffrey  wrote:

Hello.


I have a Spark streaming job running on a cluster managed by Yarn.  The spark 
streaming job starts and receives data from Kafka.  It is processing well and 
then after several seconds I see the following error:


15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not initialize 
after waiting for 10 ms. Please check earlier log output for errors. 
Failing the application.
15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED, 
exitCode: 13, (reason: Timed out waiting for SparkContext.)


The spark process is then (obviously) shut down by Yarn. 


What do I need to change to allow Yarn to initialize Spark streaming (vs. 
batch) jobs?

Thank you,


Bryan Jeffrey

Re: Yarn Shutting Down Spark Processing

2015-09-23 Thread Marcelo Vanzin
Did you look at your application's logs (using the "yarn logs" command?).

That error means your application is failing to create a SparkContext.
So either you have a bug in your code, or there will be some error in
the log pointing at the actual reason for the failure.

On Tue, Sep 22, 2015 at 5:49 PM, Bryan Jeffrey  wrote:
> Hello.
>
> I have a Spark streaming job running on a cluster managed by Yarn.  The
> spark streaming job starts and receives data from Kafka.  It is processing
> well and then after several seconds I see the following error:
>
> 15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not
> initialize after waiting for 10 ms. Please check earlier log output for
> errors. Failing the application.
> 15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 13, (reason: Timed out waiting for SparkContext.)
>
> The spark process is then (obviously) shut down by Yarn.
>
> What do I need to change to allow Yarn to initialize Spark streaming (vs.
> batch) jobs?
>
> Thank you,
>
> Bryan Jeffrey



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Yarn Shutting Down Spark Processing

2015-09-23 Thread Bryan
Marcelo,

The error below is from the application logs. The spark streaming context is 
initialized and actively processing data when yarn claims that the context is 
not initialized.

There are a number of errors, but they're all associated with the ssc shutting 
down.

Regards,

Bryan Jeffrey

-Original Message-
From: "Marcelo Vanzin" 
Sent: ‎9/‎23/‎2015 5:55 PM
To: "Bryan Jeffrey" 
Cc: "user" 
Subject: Re: Yarn Shutting Down Spark Processing

Did you look at your application's logs (using the "yarn logs" command?).

That error means your application is failing to create a SparkContext.
So either you have a bug in your code, or there will be some error in
the log pointing at the actual reason for the failure.

On Tue, Sep 22, 2015 at 5:49 PM, Bryan Jeffrey  wrote:
> Hello.
>
> I have a Spark streaming job running on a cluster managed by Yarn.  The
> spark streaming job starts and receives data from Kafka.  It is processing
> well and then after several seconds I see the following error:
>
> 15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not
> initialize after waiting for 10 ms. Please check earlier log output for
> errors. Failing the application.
> 15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 13, (reason: Timed out waiting for SparkContext.)
>
> The spark process is then (obviously) shut down by Yarn.
>
> What do I need to change to allow Yarn to initialize Spark streaming (vs.
> batch) jobs?
>
> Thank you,
>
> Bryan Jeffrey



-- 
Marcelo


Re: Java Heap Space Error

2015-09-23 Thread Yusuf Can Gürkan
Yes, it’s possible. I use S3 as data source. My external tables has 
partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 
in 2.stage because of sql.shuffle.partitions. 

How can i avoid this situation, this is my query:

select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not 
NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
 ') inputlist from landing where dt='2015-9' and userid != '' and userid is 
not null and userid is not NULL and pagetype = 'productDetail' group by userid

> On 23 Sep 2015, at 23:55, java8964  wrote:
> 
> Based on your description, you job shouldn't have any shuffle then, as you 
> just apply regex and concatenation on the column, but there is one partition 
> having 4.3M records to be read, vs less than 1M records for other partitions.
> 
> Is that possible? It depends on what is the source of your data.
> 
> If there is shuffle in your query (More than 2 stages generated by your 
> query, and this is my guess of what happening), then it simple means that one 
> partition having way more data than the rest of partitions.
> 
> Yong
> 
> From: yu...@useinsider.com
> Subject: Java Heap Space Error
> Date: Wed, 23 Sep 2015 23:07:17 +0300
> To: user@spark.apache.org
> 
> What can cause this issue in the attached picture? I’m running and sql query 
> which runs a regex on strings and concatenates them. Because of this task, my 
> job gives java heap space error.
> 
> 



Re: Join over many small files

2015-09-23 Thread ayan guha
I think this can be a good case for using sequence file format to pack many
files to few sequence files with file name as key andd content as value.
Then read it as RDD and produce tuples like you mentioned (key=fileno+id,
value=value). After that, it is a simple map operation to generate the diff
(broadcasting special file is right idea).

On Thu, Sep 24, 2015 at 7:31 AM, Tracewski, Lukasz <
lukasz.tracew...@credit-suisse.com> wrote:

> Hi all,
>
>
>
> I would like you to ask for an advise on how to efficiently make a join
> operation in Spark with tens of thousands of tiny files. A single file has
> a few KB and ~50 rows. In another scenario they might have 200 KB and 2000
> rows.
>
>
>
> To give you impression how they look like:
>
>
>
> File 01
>
> ID | VALUE
>
> 01 | 10
>
> 02 | 12
>
> 03 | 55
>
> …
>
>
>
> File 02
>
> ID | VALUE
>
> 01 | 33
>
> 02 | 21
>
> 03 | 53
>
> …
>
>
>
> and so on… ID is unique in a file, but repeats in every file. There is
> also a Special file which has the same form:
>
>
>
> File Special
>
> ID | VALUE
>
> 01 | 21
>
> 02 | 23
>
> 03 | 54
>
> …
>
>
>
> What I would like to get is a join of File 01..1 with File Special to
> get a difference between values:
>
>
>
> File Result 01 = File Special – File 01
>
> ID | VALUE
>
> 01 | 21-10
>
> 02 | 23-12
>
> 03 | 54-53
>
> …
>
>
>
> And save result to a csv, meaning 1 new files. What’s the best way of
> doing this?
>
>
>
> My idea was the following:
>
> 1.   Read all Files with wholeTextFiles, each to a separate partition
>
> 2.   Perform map-side join with broadcast variable inside
> mapPartitions (the “Special” file will be broadcasted).
>
>
>
> I am on Spark 1.3, but it can be upgraded if needed. Perhaps this could be
> done better in a dataframe? Then I would create one large dataframe, with
> additional “filename” key, i.e.:
>
> File | ID | Value
>
> 01 | 01 | 10
>
> 01 | 02 | 12
>
> 01 | 03 | 55
>
> 02 | 01 | 21
>
> 02 | 02 | 23
>
> …
>
>
>
> What would be then a way to make an efficient query over such dataframe?
>
>
>
> Any advice will be appreciated.
>
>
>
> Best regards,
>
> Lucas
>
>
>
> ==
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==
>



-- 
Best Regards,
Ayan Guha


Re: Yarn Shutting Down Spark Processing

2015-09-23 Thread Marcelo Vanzin
But that's not the complete application log. You say the streaming
context is initialized, but can you show that in the logs? There's
something happening that is causing the SparkContext to not be
registered with the YARN backend, and that's why your application is
being killed.

If you can share the complete log or the code, that would clarify things.

On Wed, Sep 23, 2015 at 3:20 PM, Bryan  wrote:
> The error below is from the application logs. The spark streaming context is
> initialized and actively processing data when yarn claims that the context
> is not initialized.


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Spark for portfolio manager app

2015-09-23 Thread ALEX K
Thuy,

if you decide to go with Hbase for external storage consider using a
light-weight SQL layer such as Apache Phoenix, it has a spark plugin
 & JDBC driver, and
throughput is pretty good even for heavy market data feed (make sure to use
batched commits).

In our case we send Kafka streams directly into Hbase via Phoenix JDBC
upserts , and
Spark dataframes are mapped to Phoenix tables for downstream analytics.

Alternatively you can use Cassandra

for the backend, but phoenix saves  you a lot of coding, and a lot of
optimizations for joins & aggregations are already done for you (it plugs
into Hbase coprocessors).

Alex

On Tue, Sep 22, 2015 at 12:12 PM, Thúy Hằng Lê 
wrote:

> That's great answer Andrian.
> I find a lots of information here. I have direction for application now, i
> will try your suggestion :)
>
> Vào Thứ Ba, ngày 22 tháng 9 năm 2015, Adrian Tanase 
> đã viết:
>
>
>>1. reading from kafka has exactly once guarantees - we are using it
>>in production today (with the direct receiver)
>>1. ​you will probably have 2 topics, loading both into spark and
>>   joining / unioning as needed is not an issue
>>   2. tons of optimizations you can do there, assuming everything
>>   else works
>>2. ​for ad-hoc query I would say you absolutely need to look at
>>external storage
>>1. ​querying the Dstream or spark's RDD's directly should be done
>>   mostly for aggregates/metrics, not by users
>>   2. if you look at HBase or Cassandra for storage then 50k
>>   writes /sec are not a problem at all, especially combined with a smart
>>   client that does batch puts (like async hbase
>>   )
>>   3. you could also consider writing the updates to another kafka
>>   topic and have  a different component that updates the DB, if you 
>> think of
>>   other optimisations there
>>3. ​by stats I assume you mean metrics (operational or business)
>>1. ​there are multiple ways to do this, however I would not encourage
>>   you to query spark directly, especially if you need an archive/history 
>> of
>>   your datapoints
>>   2. we are using OpenTSDB (we already have a HBase cluster) +
>>   Grafana for dashboarding
>>   3. collecting the metrics is a bit hairy in a streaming app - we
>>   have experimented with both accumulators and RDDs specific for metrics 
>> -
>>   chose the RDDs that write to OpenTSDB using foreachRdd
>>
>> ​-adrian
>>
>> --
>> *From:* Thúy Hằng Lê 
>> *Sent:* Sunday, September 20, 2015 7:26 AM
>> *To:* Jörn Franke
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Using Spark for portfolio manager app
>>
>> Thanks Adrian and Jorn for the answers.
>>
>> Yes, you're right there are lot of things I need to consider if I want to
>> use Spark for my app.
>>
>> I still have few concerns/questions from your information:
>>
>> 1/ I need to combine trading stream with tick stream, I am planning to
>> use Kafka for that
>> If I am using approach #2 (Direct Approach) in this tutorial
>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
>> 
>> Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ...
>> Spark Streaming + Kafka Integration Guide. Apache Kafka is
>> publish-subscribe messaging rethought as a distributed, partitioned,
>> replicated commit log service.
>> Read more...
>> 
>>
>> Will I receive exactly one semantics? Or I have to add some logic in my
>> code to archive that.
>> As your suggestion of using delta update, exactly one semantic is
>> required for this application.
>>
>> 2/ For ad-hoc query, I must output of Spark to external storage and query
>> on that right?
>> Is there any way to do ah-hoc query on Spark? my application could have
>> 50k updates per second at pick time.
>> Persistent to external storage lead to high latency in my app.
>>
>> 3/ How to get real-time statistics from Spark,
>> In  most of the Spark streaming examples, the statistics are echo to the
>> stdout.
>> However, I want to display those statics on GUI, is there any way to
>> retrieve data from Spark directly without using external Storage?
>>
>>
>> 2015-09-19 16:23 GMT+07:00 Jörn Franke :
>>
>>> If you want to be able to let your users query their portfolio then you
>>> may want to think about storing the current state of the portfolios in
>>> hbase/phoenix or alternatively a cluster of relationaldatabases can make
>>> sense. For the rest you may use Spark.
>>>
>>> Le sam. 19 sept. 2015 à 4:43, 

Re: Creating BlockMatrix with java API

2015-09-23 Thread Pulasthi Supun Wickramasinghe
Hi YiZhi,

Actually i was not able to try it out to see if it was working. I sent the
previous reply assuming that Sabarish's solution would work :). Sorry if
there was any confusion.

Best Regards,
Pulasthi

On Wed, Sep 23, 2015 at 6:47 AM, YiZhi Liu  wrote:

> Hi Pulasthi,
>
> Are you sure this worked? When I applied rdd.rdd() to the constructor
> of BlockMatrix, the complier complained
>
> [error]
> spark/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVD.java:38:
> error: incompatible types: RDD,Matrix>>
> cannot be converted to RDD,Matrix>>
> [error] BlockMatrix blockMatrix = new BlockMatrix(rdd.rdd(), 2, 2);
>
> It must caused by the type elimination from scala to java. To make it
> work, we have to define 'rdd' as JavaRDD Object>, Matrix>>
>
> As Yanbo has mentioned, I think a Java friendly constructor is still in
> demand.
>
> 2015-09-23 13:14 GMT+08:00 Pulasthi Supun Wickramasinghe
> :
> > Hi Sabarish
> >
> > Thanks, that would indeed solve my problem
> >
> > Best Regards,
> > Pulasthi
> >
> > On Wed, Sep 23, 2015 at 12:55 AM, Sabarish Sasidharan
> >  wrote:
> >>
> >> Hi Pulasthi
> >>
> >> You can always use JavaRDD.rdd() to get the scala rdd. So in your case,
> >>
> >> new BlockMatrix(rdd.rdd(), 2, 2)
> >>
> >> should work.
> >>
> >> Regards
> >> Sab
> >>
> >> On Tue, Sep 22, 2015 at 10:50 PM, Pulasthi Supun Wickramasinghe
> >>  wrote:
> >>>
> >>> Hi Yanbo,
> >>>
> >>> Thanks for the reply. I thought i might be missing something. Anyway i
> >>> moved to using scala since it is the complete API.
> >>>
> >>> Best Regards,
> >>> Pulasthi
> >>>
> >>> On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang 
> wrote:
> 
>  This is due to the distributed matrices like
>  BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do not
> provide Java
>  friendly constructors. I have file a SPARK-10757 to track this issue.
> 
>  2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe
>  :
> >
> > Hi All,
> >
> > I am new to Spark and i am trying to do some BlockMatrix operations
> > with the Mllib API's. But i can't seem to create a BlockMatrix with
> the java
> > API. I tried the following
> >
> > Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
> > List,Matrix>> list = new
> > ArrayList, Matrix>>();
> > Tuple2 intTuple = new Tuple2(0,0);
> > Tuple2,Matrix> tuple2MatrixTuple2 = new
> > Tuple2, Matrix>(intTuple,matrixa );
> > list.add(tuple2MatrixTuple2);
> > JavaRDD, Matrix>> rdd =
> > sc.parallelize(list);
> >
> > BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);
> >
> >
> > but since BlockMatrix only takes
> >
> "RDD,Matrix>>"
> > this code does not work. sc.parallelize() returns a JavaRDD so the
> two are
> > not compatible. I also couldn't find any code samples for this. Any
> help on
> > this would be highly appreciated.
> >
> > Best Regards,
> > Pulasthi
> > --
> > Pulasthi S. Wickramasinghe
> > Graduate Student  | Research Assistant
> > School of Informatics and Computing | Digital Science Center
> > Indiana University, Bloomington
> > cell: 224-386-9035
> 
> 
> >>>
> >>>
> >>>
> >>> --
> >>> Pulasthi S. Wickramasinghe
> >>> Graduate Student  | Research Assistant
> >>> School of Informatics and Computing | Digital Science Center
> >>> Indiana University, Bloomington
> >>> cell: 224-386-9035
> >>
> >>
> >>
> >>
> >> --
> >>
> >> Architect - Big Data
> >> Ph: +91 99805 99458
> >>
> >> Manthan Systems | Company of the year - Analytics (2014 Frost and
> Sullivan
> >> India ICT)
> >> +++
> >
> >
> >
> >
> > --
> > Pulasthi S. Wickramasinghe
> > Graduate Student  | Research Assistant
> > School of Informatics and Computing | Digital Science Center
> > Indiana University, Bloomington
> > cell: 224-386-9035
>
>
>
> --
> 刘忆智
> 广告质量部
>
> MV AD 聚效广告   上海 · 北京 · 广州 · 杭州
> __
> 上海市闸北区天目中路585号新梅大厦4楼  200070
> MOB:15021072706
> TEL:021-52559088
> FAX:021-52559089
> EMAIL:li...@mvad.com
> HTTP:www.mvad.com
> ---CONFIDENTIAL --
> 本邮件载有秘密信息,请您恪守保密义务,勿向第三人透露。谢谢合作。
> This email communication is confidential. Recipient(s) named above
> is(are) obligated to maintain secrecy and is(are) not permitted to
> disclose the contents of this communication to others. Thank you.
>
>


-- 
Pulasthi S. Wickramasinghe
Graduate Student  | Research Assistant
School of Informatics 

reduceByKeyAndWindow confusion

2015-09-23 Thread srungarapu vamsi
I create  a stream from kafka as belows"

val kafkaDStream  =
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
kafkaConf, Set(topics))

.window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION))

I have a map ("intToStringList") which is a Map[Int,List[String]]
using this map i am filtering the stream and finally converting it into
Map[Int,DStream[KafkaGenericEvent]]]

1.
Now on this map, for each and every value (which is a
DStream[KafkaGenericEvent])
i am applying reduceByKeyAndWindow operation.
But since we have to give window duration and slider duration even in
reduceByKeyAndWindow, does that imply that on every window of the given
DStream, reduceByKeyAndWindow can be applied with a different window
duration and slider duration ?
i.e Lets say window DStream is created with window duration-> 16 minutes,
slider duration -> 1 Minute, so  i have one RDD for every window
For reduceByKeyAndWindow, if we have window duration as as 4 minutes and
slider duration as 1 minute, then will i get 4 RDDs since the
windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ?

2.
As suggested in spark doc, i am trying to give checkpointing interval on
the kafkaDStream created in the block shown above in the following way:
kafkaDStream.checkpoint(Minutes(4))

But when i execute this, i get the error:
"WindowedDStream has been marked for checkpointing but the storage level
has not been set to enable persisting. Please use DStream.persist() to set
the storage level to use memory for better checkpointing performance"
But when i went through the implementation of checkpoint function  of
DStream.scala, i see a call to persist() function.
Then do i really have to persist function in the WindowedDStream ?
Just to give a shot i made a call to persist method on the windowedDStream
and then made a call to checkpoint(interval) . Even then i am facing the
above mentioned error.
How do i solve this ?
-- 
/Vamsi


Re: Spark 1.5.0 on YARN dynamicAllocation - Initial job has not accepted any resources

2015-09-23 Thread Jonathan Kelly
AHA! I figured it out, but it required some tedious remote debugging of the
Spark ApplicationMaster. (But now I understand the Spark codebase a little
better than before, so I guess I'm not too put out. =P)

Here's what's happening...

I am setting spark.dynamicAllocation.minExecutors=1 but am not setting
spark.dynamicAllocation.initialExecutors, so it's remaining at the default
of spark.dynamicAllocation.minExecutors. However, ExecutorAllocationManager
doesn't actually request any executors while the application is still
initializing (see comment here
),
but it still sets numExecutorsTarget to
spark.dynamicAllocation.initialExecutors (i.e., 1).

The JavaWordCount example I've been trying to run is only operating on a
very small file, so its first stage only has a single task and thus should
request a single executor once the polling loop comes along.

Then on this line
,
it returns numExecutorsTarget (1) - oldNumExecutorsTarget (still 1, even
though there aren't any executors running yet) = 0, for the number of
executors it should request. Then the app hangs forever because it never
requests any executors.

I verified this further by setting spark.dynamicAllocation.minExecutors=100
and trying to run my SparkPi example I mentioned earlier (which runs 100
tasks in its first stage because that's the number I'm passing to the
driver). Then it would hang in the same way as my JavaWordCount example. If
I run it again, passing 101 (so that it has 101 tasks), it works, and if I
pass 99, it hangs again.

So it seems that I have found a bug in that if you set
spark.dynamicAllocation.minExecutors (or, presumably,
spark.dynamicAllocation.initialExecutors), and the number of tasks in your
first stage is less than or equal to this min/init number of executors, it
won't actually request any executors and will just hang indefinitely.

I can't seem to find a JIRA for this, so shall I file one, or has anybody
else seen anything like this?

~ Jonathan

On Wed, Sep 23, 2015 at 7:08 PM, Jonathan Kelly 
wrote:

> Another update that doesn't make much sense:
>
> The SparkPi example does work on yarn-cluster mode with dynamicAllocation.
>
> That is, the following command works (as well as with yarn-client mode):
>
> spark-submit --deploy-mode cluster --class
> org.apache.spark.examples.SparkPi spark-examples.jar 100
>
> But the following one does not work (nor does it work for yarn-client
> mode):
>
> spark-submit --deploy-mode cluster --class
> org.apache.spark.examples.JavaWordCount spark-examples.jar
> /tmp/word-count-input.txt
>
> So this JavaWordCount example hangs on requesting executors, while SparkPi
> and spark-shell do work.
>
> ~ Jonathan
>
> On Wed, Sep 23, 2015 at 6:22 PM, Jonathan Kelly 
> wrote:
>
>> Thanks for the quick response!
>>
>> spark-shell is indeed using yarn-client. I forgot to mention that I also
>> have "spark.master yarn-client" in my spark-defaults.conf file too.
>>
>> The working spark-shell and my non-working example application both
>> display spark.scheduler.mode=FIFO on the Spark UI. Is that what you are
>> asking about? I haven't actually messed around with different scheduler
>> modes yet.
>>
>> One more thing I should mention is that the YARN ResourceManager tells me
>> the following on my 5-node cluster, with one node being the master and not
>> running a NodeManager:
>> Memory Used: 1.50 GB (this is the running ApplicationMaster that's
>> waiting and waiting for the executors to start up)
>> Memory Total: 45 GB (11.25 from each of the 4 slave nodes)
>> VCores Used: 1
>> VCores Total: 32
>> Active Nodes: 4
>>
>> ~ Jonathan
>>
>> On Wed, Sep 23, 2015 at 6:10 PM, Andrew Duffy 
>> wrote:
>>
>>> What pool is the spark shell being put into? (You can see this through
>>> the YARN UI under scheduler)
>>>
>>> Are you certain you're starting spark-shell up on YARN? By default it
>>> uses a local spark executor, so if it "just works" then it's because it's
>>> not using dynamic allocation.
>>>
>>>
>>> On Wed, Sep 23, 2015 at 18:04 Jonathan Kelly 
>>> wrote:
>>>
 I'm running into a problem with YARN dynamicAllocation on Spark 1.5.0
 after using it successfully on an identically configured cluster with Spark
 1.4.1.

 I'm getting the dreaded warning "YarnClusterScheduler: Initial job has
 not accepted any resources; check your cluster UI to ensure that workers
 are registered and have sufficient resources", though there's nothing else
 running on my cluster, and the nodes should have plenty of resources to run
 my application.

 Here are the applicable properties in spark-defaults.conf:
 spark.dynamicAllocation.enabled  true

Re: SparkR for accumulo

2015-09-23 Thread madhvi.gupta

Ohk.Thanks

Thanks and Regards
Madhvi Gupta

On Thursday 24 September 2015 08:12 AM, Sun, Rui wrote:

No.

It is possible you create a helper function which can creat accumulo data RDDs in 
Scala or Java (maybe put such code in a JAR, add using --jar   on the 
command line to start SparkR to use it ?) and in SparkR you can use the private 
functions like callJMethod to call it and the created RDD objects can be referenced 
on R side.

However, there is a critical step missing in SparkR now, which is the support 
of conversion from a source RDD (other than text file RDD) to RRDD. If you 
can't convert a source RDD from JVM to RRDD, you can't further use SparkR RDD 
API to apply transformations on it.

-Original Message-
From: madhvi.gupta [mailto:madhvi.gu...@orkash.com]
Sent: Wednesday, September 23, 2015 11:42 AM
To: Sun, Rui; user
Subject: Re: SparkR for accumulo

Hi Rui,

Cant we use the accumulo data RDD created from JAVA in spark, in sparkR?

Thanks and Regards
Madhvi Gupta

On Tuesday 22 September 2015 04:42 PM, Sun, Rui wrote:

I am afraid that there is no support for accumulo in SparkR now, because:

1. It seems that there is no data source support for accumulo, so we
can't create SparkR dataframe on accumulo 2. It is possible to create RDD from 
accumulo via AccumuloInputFormat in Scala. But unfortunately, SparkR does not 
support creating RDD from Hadoop files other than text file.

-Original Message-
From: madhvi.gupta [mailto:madhvi.gu...@orkash.com]
Sent: Tuesday, September 22, 2015 6:25 PM
To: user
Subject: SparkR for accumulo

Hi,

I want to process accumulo data in R through sparkR.Can anyone help me and let 
me know how to get accumulo data in spark to be used in R?

--
Thanks and Regards
Madhvi Gupta


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR for accumulo

2015-09-23 Thread madhvi.gupta

Hi,

Is there any other way to proceed with it to create RRDD from a source 
RDD other than text RDD?Or to use any other format of data stored in 
HDFS in sparkR?

Also please elaborate me the kind of step missing in sparkR fro this.

Thanks and Regards
Madhvi Gupta

On Thursday 24 September 2015 08:12 AM, Sun, Rui wrote:

No.

It is possible you create a helper function which can creat accumulo data RDDs in 
Scala or Java (maybe put such code in a JAR, add using --jar   on the 
command line to start SparkR to use it ?) and in SparkR you can use the private 
functions like callJMethod to call it and the created RDD objects can be referenced 
on R side.

However, there is a critical step missing in SparkR now, which is the support 
of conversion from a source RDD (other than text file RDD) to RRDD. If you 
can't convert a source RDD from JVM to RRDD, you can't further use SparkR RDD 
API to apply transformations on it.

-Original Message-
From: madhvi.gupta [mailto:madhvi.gu...@orkash.com]
Sent: Wednesday, September 23, 2015 11:42 AM
To: Sun, Rui; user
Subject: Re: SparkR for accumulo

Hi Rui,

Cant we use the accumulo data RDD created from JAVA in spark, in sparkR?

Thanks and Regards
Madhvi Gupta

On Tuesday 22 September 2015 04:42 PM, Sun, Rui wrote:

I am afraid that there is no support for accumulo in SparkR now, because:

1. It seems that there is no data source support for accumulo, so we
can't create SparkR dataframe on accumulo 2. It is possible to create RDD from 
accumulo via AccumuloInputFormat in Scala. But unfortunately, SparkR does not 
support creating RDD from Hadoop files other than text file.

-Original Message-
From: madhvi.gupta [mailto:madhvi.gu...@orkash.com]
Sent: Tuesday, September 22, 2015 6:25 PM
To: user
Subject: SparkR for accumulo

Hi,

I want to process accumulo data in R through sparkR.Can anyone help me and let 
me know how to get accumulo data in spark to be used in R?

--
Thanks and Regards
Madhvi Gupta


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to fix some WARN when submit job on spark 1.5 YARN

2015-09-23 Thread r7raul1...@163.com
1 WARN netlib.BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeSystemBLAS
2 WARN netlib.BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeRefBLAS
3 WARN  Unable to load native-hadoop library for your platform


r7raul1...@163.com


RE: SparkR for accumulo

2015-09-23 Thread Sun, Rui
No.

It is possible you create a helper function which can creat accumulo data RDDs 
in Scala or Java (maybe put such code in a JAR, add using --jar   on 
the command line to start SparkR to use it ?) and in SparkR you can use the 
private functions like callJMethod to call it and the created RDD objects can 
be referenced on R side.

However, there is a critical step missing in SparkR now, which is the support 
of conversion from a source RDD (other than text file RDD) to RRDD. If you 
can't convert a source RDD from JVM to RRDD, you can't further use SparkR RDD 
API to apply transformations on it.

-Original Message-
From: madhvi.gupta [mailto:madhvi.gu...@orkash.com] 
Sent: Wednesday, September 23, 2015 11:42 AM
To: Sun, Rui; user
Subject: Re: SparkR for accumulo

Hi Rui,

Cant we use the accumulo data RDD created from JAVA in spark, in sparkR?

Thanks and Regards
Madhvi Gupta

On Tuesday 22 September 2015 04:42 PM, Sun, Rui wrote:
> I am afraid that there is no support for accumulo in SparkR now, because:
>
> 1. It seems that there is no data source support for accumulo, so we 
> can't create SparkR dataframe on accumulo 2. It is possible to create RDD 
> from accumulo via AccumuloInputFormat in Scala. But unfortunately, SparkR 
> does not support creating RDD from Hadoop files other than text file.
>
> -Original Message-
> From: madhvi.gupta [mailto:madhvi.gu...@orkash.com]
> Sent: Tuesday, September 22, 2015 6:25 PM
> To: user
> Subject: SparkR for accumulo
>
> Hi,
>
> I want to process accumulo data in R through sparkR.Can anyone help me and 
> let me know how to get accumulo data in spark to be used in R?
>
> --
> Thanks and Regards
> Madhvi Gupta
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Re: LogisticRegression models consumes all driver memory

2015-09-23 Thread DB Tsai
You want to reduce the # of partitions to around the # of executors *
cores. Since you have so many tasks/partitions which will give a lot of
pressure on treeReduce in LoR. Let me know if this helps.


Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Wed, Sep 23, 2015 at 5:39 PM, Eugene Zhulenev 
wrote:

> ~3000 features, pretty sparse, I think about 200-300 non zero features in
> each row. We have 100 executors x 8 cores. Number of tasks is pretty big,
> 30k-70k, can't remember exact number. Training set is a result of pretty
> big join from multiple data frames, but it's cached. However as I
> understand Spark still keeps DAG history of RDD to be able to recover it in
> case of failure of one of the nodes.
>
> I'll try tomorrow to save train set as parquet, load it back as DataFrame
> and run modeling this way.
>
> On Wed, Sep 23, 2015 at 7:56 PM, DB Tsai  wrote:
>
>> Your code looks correct for me. How many # of features do you have in
>> this training? How many tasks are running in the job?
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Blog: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>> 
>>
>> On Wed, Sep 23, 2015 at 4:38 PM, Eugene Zhulenev <
>> eugene.zhule...@gmail.com> wrote:
>>
>>> It's really simple:
>>> https://gist.github.com/ezhulenev/886517723ca4a353
>>>
>>> The same strange heap behavior we've seen even for single model, it
>>> takes ~20 gigs heap on a driver to build single model with less than 1
>>> million rows in input data frame.
>>>
>>> On Wed, Sep 23, 2015 at 6:31 PM, DB Tsai  wrote:
>>>
 Could you paste some of your code for diagnosis?


 Sincerely,

 DB Tsai
 --
 Blog: https://www.dbtsai.com
 PGP Key ID: 0xAF08DF8D
 

 On Wed, Sep 23, 2015 at 3:19 PM, Eugene Zhulenev <
 eugene.zhule...@gmail.com> wrote:

> We are running Apache Spark 1.5.0 (latest code from 1.5 branch)
>
> We are running 2-3 LogisticRegression models in parallel (we'd love to
> run 10-20 actually), they are not really big at all, maybe 1-2 million 
> rows
> in each model.
>
> Cluster itself, and all executors look good. Enough free memory and no
> exceptions or errors.
>
> However I see very strange behavior inside Spark driver. Allocated
> heap constantly growing. It grows up to 30 gigs in 1.5 hours and then
> everything becomes super sloow.
>
> We don't do any collect, and I really don't understand who is
> consuming all this memory. Looks like it's something inside
> LogisticRegression itself, however I only see treeAggregate which should
> not require so much memory to run.
>
> Any ideas?
>
> Plus I don't see any GC pause, looks like memory is still used by
> someone inside driver.
>
> [image: Inline image 2]
> [image: Inline image 1]
>


>>>
>>
>


KMeans Model fails to run

2015-09-23 Thread Soong, Eddie
Hi,

Why am I getting this error which prevents my KMeans clustering algorithm to 
work inside of Spark? I'm trying to run a sample Scala model found in 
Databricks website on my Cloudera-Spark 1-node local VM. For completeness, the 
Scala program is as follows: Thx

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("/path/to/file")
  .map(s => Vectors.dense(s.split(',').map(_.toDouble)))

// Cluster the data into three classes using KMeans
val numIterations = 20
val numClusters = 3
val kmeansModel = KMeans.train(data, numClusters, numIterations)


5/09/23 19:38:11 WARN clustering.KMeans: The input data is not directly cached, 
which may hurt performance if its parent RDDs are also uncached.
java.io.IOException: No FileSystem for scheme: c
   at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
   at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
   at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
   at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
   at 
org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
   at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
   at 
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
   at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
   at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
   at 
org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:55)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
   at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
   at scala.Option.getOrElse(Option.scala:120)
   at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1517)
   at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
   at org.apache.spark.rdd.RDD.takeSample(RDD.scala:428)
   at 
org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:288)
   at 
org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:162)
   at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:139)
   at 
org.apache.spark.mllib.clustering.KMeans$.train(KMeans.scala:420)
   at 
org.apache.spark.mllib.clustering.KMeans$.train(KMeans.scala:430)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:29)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:34)
   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
   at $iwC$$iwC$$iwC$$iwC$$iwC.(:38)
   at $iwC$$iwC$$iwC$$iwC.(:40)
   at $iwC$$iwC$$iwC.(:42)
   at $iwC$$iwC.(:44)
   at $iwC.(:46)
   at (:48)
   at .(:52)
   at .()
   at .(:7)
   at .()
   at $print()
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 

Re: LogisticRegression models consumes all driver memory

2015-09-23 Thread DB Tsai
Could you paste some of your code for diagnosis?


Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Wed, Sep 23, 2015 at 3:19 PM, Eugene Zhulenev 
wrote:

> We are running Apache Spark 1.5.0 (latest code from 1.5 branch)
>
> We are running 2-3 LogisticRegression models in parallel (we'd love to run
> 10-20 actually), they are not really big at all, maybe 1-2 million rows in
> each model.
>
> Cluster itself, and all executors look good. Enough free memory and no
> exceptions or errors.
>
> However I see very strange behavior inside Spark driver. Allocated heap
> constantly growing. It grows up to 30 gigs in 1.5 hours and then everything
> becomes super sloow.
>
> We don't do any collect, and I really don't understand who is consuming
> all this memory. Looks like it's something inside LogisticRegression
> itself, however I only see treeAggregate which should not require so much
> memory to run.
>
> Any ideas?
>
> Plus I don't see any GC pause, looks like memory is still used by someone
> inside driver.
>
> [image: Inline image 2]
> [image: Inline image 1]
>


Re: How to obtain the key in updateStateByKey

2015-09-23 Thread Ted Yu
  def updateStateByKey[S: ClassTag](
  updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],

updateFunc is given an iterator. You can access the key with _1 on the
iterator.

On Wed, Sep 23, 2015 at 3:01 PM, swetha  wrote:

> Hi,
>
> How to obtain the current key in updateStateBykey ?
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-obtain-the-key-in-updateStateByKey-tp24792.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[POWERED BY] Please add our organization

2015-09-23 Thread barmaley
Name: Frontline Systems Inc.
URL: www.solver.com

Description:
•   We built an interface between Microsoft Excel and Apache Spark - 
bringing
Big Data from the clusters to Excel enabling tools ranging from simple
charts and Power View dashboards to add-ins for machine learning and
predictive analytics, Monte Carlo simulation and risk analysis, and linear
and nonlinear optimization. Using Spark Core API and Spark SQL to draw
representative samples and summarize large datasets, it’s now possible to
extract “actionable small data” from a Big Data cluster, and make it usable
by business analysts who lack programming expertise.
•   See our blog post presenting the analysis of 20+ years of airline flight
data using Excel only




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/POWERED-BY-Please-add-our-organization-tp24794.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



caching DataFrames

2015-09-23 Thread Zhang, Jingyu
I have A and B DataFrames
A has columns a11,a12, a21,a22
B has columns b11,b12, b21,b22

I persistent them in cache
1. A.Cache(),
2.  B.Cache()

Then, I persistent the subset in cache later

3. DataFrame A1 (a11,a12).cache()

4. DataFrame B1 (b11,b12).cache()

5. DataFrame AB1 (a11,a12,b11,b12).cahce()

Can you please tell me what happen for caching case (3,4, and 5) after A
and B cached?
How much  more memory do I need compare with Caching 1 and 2 only?

Thanks

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


[POWERED BY] Please add our organization

2015-09-23 Thread Oleg Shirokikh
Name: Frontline Systems Inc.
URL: www.solver.com

Description:
*  We built an interface between Microsoft Excel and Apache Spark - bringing 
Big Data from the clusters to Excel enabling tools ranging from simple charts 
and Power View dashboards to add-ins for machine learning and predictive 
analytics, Monte Carlo simulation and risk analysis, and linear and nonlinear 
optimization. Using Spark Core API and Spark SQL to draw representative samples 
and summarize large datasets, it's now possible to extract "actionable small 
data" from a Big Data cluster, and make it usable by business analysts who lack 
programming expertise.
*  See our blog 
post
 presenting the analysis of 20+ years of airline flight data using Excel only



Re: Custom Hadoop InputSplit, Spark partitions, spark executors/task and Yarn containers

2015-09-23 Thread Sandy Ryza
Hi Anfernee,

That's correct that each InputSplit will map to exactly a Spark partition.

On YARN, each Spark executor maps to a single YARN container.  Each
executor can run multiple tasks over its lifetime, both parallel and
sequentially.

If you enable dynamic allocation, after the stage including the InputSplits
gets submitted, Spark will try to request an appropriate number of
executors.

The memory in the YARN resource requests is --executor-memory + what's set
for spark.yarn.executor.memoryOverhead, which defaults to 10% of
--executor-memory.

-Sandy

On Wed, Sep 23, 2015 at 2:38 PM, Anfernee Xu  wrote:

> Hi Spark experts,
>
> I'm coming across these terminologies and having some confusions, could
> you please help me understand them better?
>
> For instance I have implemented a Hadoop InputFormat to load my external
> data in Spark, in turn my custom InputFormat will create a bunch of
> InputSplit's, my questions is about
>
> # Each InputSplit will exactly map to a Spark partition, is that correct?
>
> # If I run on Yarn, how does Spark executor/task map to Yarn container?
>
> # because I already have a bunch of InputSplits, do I still need to
> specify the number of executors to get processing parallelized?
>
> # How does -executor-memory map to the memory requirement in Yarn's
> resource request?
>
> --
> --Anfernee
>


Re: LogisticRegression models consumes all driver memory

2015-09-23 Thread DB Tsai
Your code looks correct for me. How many # of features do you have in this
training? How many tasks are running in the job?


Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Wed, Sep 23, 2015 at 4:38 PM, Eugene Zhulenev 
wrote:

> It's really simple: https://gist.github.com/ezhulenev/886517723ca4a353
>
> The same strange heap behavior we've seen even for single model, it takes
> ~20 gigs heap on a driver to build single model with less than 1 million
> rows in input data frame.
>
> On Wed, Sep 23, 2015 at 6:31 PM, DB Tsai  wrote:
>
>> Could you paste some of your code for diagnosis?
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Blog: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>> 
>>
>> On Wed, Sep 23, 2015 at 3:19 PM, Eugene Zhulenev <
>> eugene.zhule...@gmail.com> wrote:
>>
>>> We are running Apache Spark 1.5.0 (latest code from 1.5 branch)
>>>
>>> We are running 2-3 LogisticRegression models in parallel (we'd love to
>>> run 10-20 actually), they are not really big at all, maybe 1-2 million rows
>>> in each model.
>>>
>>> Cluster itself, and all executors look good. Enough free memory and no
>>> exceptions or errors.
>>>
>>> However I see very strange behavior inside Spark driver. Allocated heap
>>> constantly growing. It grows up to 30 gigs in 1.5 hours and then everything
>>> becomes super sloow.
>>>
>>> We don't do any collect, and I really don't understand who is consuming
>>> all this memory. Looks like it's something inside LogisticRegression
>>> itself, however I only see treeAggregate which should not require so much
>>> memory to run.
>>>
>>> Any ideas?
>>>
>>> Plus I don't see any GC pause, looks like memory is still used by
>>> someone inside driver.
>>>
>>> [image: Inline image 2]
>>> [image: Inline image 1]
>>>
>>
>>
>


Re: Debugging too many files open exception issue in Spark shuffle

2015-09-23 Thread DB Tsai
in  ./apps/mesos-0.22.1/sbin/mesos-daemon.sh

#!/usr/bin/env bash

prefix=/apps/mesos-0.22.1
exec_prefix=/apps/mesos-0.22.1

deploy_dir=${prefix}/etc/mesos

# Increase the default number of open file descriptors.
ulimit -n 8192


Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Wed, Sep 23, 2015 at 5:14 PM, java8964  wrote:
> That is interesting.
>
> I don't have any Mesos experience, but just want to know the reason why it
> does so.
>
> Yong
>
>> Date: Wed, 23 Sep 2015 15:53:54 -0700
>> Subject: Debugging too many files open exception issue in Spark shuffle
>> From: dbt...@dbtsai.com
>> To: user@spark.apache.org
>
>>
>> Hi,
>>
>> Recently, we ran into this notorious exception while doing large
>> shuffle in mesos at Netflix. We ensure that `ulimit -n` is a very
>> large number, but still have the issue.
>>
>> It turns out that mesos overrides the `ulimit -n` to a small number
>> causing the problem. It's very non-trivial to debug (as logging in on
>> the slave gives the right ulimit - it's only in the mesos context that
>> it gets overridden).
>>
>> Here is the code you can run in Spark shell to get the actual allowed
>> # of open files for Spark.
>>
>> import sys.process._
>> val p = 1 to 100
>> val rdd = sc.parallelize(p, 100)
>> val openFiles = rdd.map(x=> Seq("sh", "-c", "ulimit
>> -n").!!.toDouble.toLong).collect
>>
>> Hope this can help someone in the same situation.
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Blog: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Debugging too many files open exception issue in Spark shuffle

2015-09-23 Thread DB Tsai
Hi,

Recently, we ran into this notorious exception while doing large
shuffle in mesos at Netflix. We ensure that `ulimit -n` is a very
large number, but still have the issue.

It turns out that mesos overrides the `ulimit -n` to a small number
causing the problem. It's very non-trivial to debug (as logging in on
the slave gives the right ulimit - it's only in the mesos context that
it gets overridden).

Here is the code you can run in Spark shell to get the actual allowed
# of open files for Spark.

import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val openFiles = rdd.map(x=> Seq("sh", "-c", "ulimit
-n").!!.toDouble.toLong).collect

Hope this can help someone in the same situation.

Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



CrossValidator speed - for loop on each parameter map?

2015-09-23 Thread julia
I’m using CrossValidator in pyspark (spark 1.4.1).
I’ve seen in the class Estimator that all 'fit' are done sequentially.
You can check the method _fit in CrossValidator class for the current
implementation:

https://spark.apache.org/docs/1.4.1/api/python/_modules/pyspark/ml/tuning.html

In the  scala api

  
there is this comment:

   * Fits multiple models to the input data with multiple sets of
parameters.
   * The default implementation uses a for loop on each parameter map.
   * Subclasses could override this to optimize multi-model training.

Is it possible to parallelize CrossValidator on nFolds and numModels so that
is faster?
The times in comparison to R glmnet are not competitive, at least for
dataframes under 3.5 million rows…

Thanks!
Julia.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CrossValidator-speed-for-loop-on-each-parameter-map-tp24795.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >