Re: Number of executors change during job running

2014-07-11 Thread Praveen Seluka
If I understand correctly, you could not change the number of executors at
runtime right(correct me if am wrong) - its defined when we start the
application and fixed. Do you mean number of tasks?


On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there are
 sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill







Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Akhil Das
Easiest fix would be adding the kafka jars to the SparkContext while
creating it.

Thanks
Best Regards


On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote:

 Hi,

 I am trying to run a program with spark streaming using Kafka on a stand
 alone system. These are my details:

 Spark 1.0.0 hadoop2
 Scala 2.10.3

 I am trying a simple program using my custom sbt project but this is the
 error I am getting:

 Exception in thread main java.lang.NoClassDefFoundError:
 kafka/serializer/StringDecoder
 at org.apache.spark.streaming.kafka.KafkaUtils$.
 createStream(KafkaUtils.scala:55)
 at org.apache.spark.streaming.kafka.KafkaUtils$.
 createStream(KafkaUtils.scala:94)
 at org.apache.spark.streaming.kafka.KafkaUtils.createStream(
 KafkaUtils.scala)
 at SimpleJavaApp.main(SimpleJavaApp.java:40)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(
 NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(
 DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 kafka.serializer.StringDecoder
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 11 more


 here is my .sbt file:

 name := Simple Project

 version := 1.0

 scalaVersion := 2.10.3

 libraryDependencies += org.apache.spark %% spark-core % 1.0.0

 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0

 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0

 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0

 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 %
 1.0.0

 libraryDependencies += org.apache.kafka %% kafka % 0.8.0

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 resolvers += Maven Repository at http://central.maven.org/maven2/;


 sbt package was successful. I also tried sbt ++2.10.3 package to build
 it for my scala version. Problem remains the same.
 Can anyone help me out here? Ive been stuck on this for quite some time
 now.

 Thank You,
 Dilip



Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread Akhil Das
You simply use the *nc* command to do this. like:

nc -p 12345

will open the 12345 port and from the terminal you can provide whatever
input you require for your StreamingCode.

Thanks
Best Regards


On Fri, Jul 11, 2014 at 2:41 AM, kytay kaiyang@gmail.com wrote:

 Hi

 I am learning spark streaming, and is trying out the JavaNetworkCount
 example.


 #1 - This is the code I wrote
 JavaStreamingContext sctx = new JavaStreamingContext(local, appName, new
 Duration(5000));
 JavaReceiverInputDStreamString lines = sctx.socketTextStream(127.0.0.1,
 );

 JavaDStreamString words =lines.flatMap(
 new FlatMapFunctionString, String() {
 @Override
 public IterableString call(String arg0) throws
 Exception {

 System.out.println(Print text: + arg0);
 return Arrays.asList(arg0.split( ));
 }
 });

 #2  - This is the socketCode I am using
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.InputStreamReader;
 import java.net.ServerSocket;
 import java.net.Socket;

 public class TestTcpServer {

 public static void main(String argv[]) throws Exception
 {
 String clientSentence;
 String capitalizedSentence;
 ServerSocket welcomeSocket = new ServerSocket();

 int i = 0;

 while(true)
 {
 Socket connectionSocket = welcomeSocket.accept();
 BufferedReader inFromClient = new BufferedReader(
 new
 InputStreamReader(connectionSocket.getInputStream())
 );
 DataOutputStream outToClient = new
 DataOutputStream(connectionSocket.getOutputStream());

 while(true)
 {
 String sendingStr = Sending... data... 
 + i;
 outToClient.writeBytes(sendingStr);
 System.out.println(sendingStr);
 i++;
 Thread.sleep(3000);
 }
 }
 }
 }

 What I am trying to do is to get the JavaNetworkCount in #1 to start
 printing all the text I am receiving. But so far I failed to achieve that.

 I have been using  Hercules Setup
 http://www.hw-group.com/products/hercules/details_en.html   to simulate
 as
 a TCP server, as well as a simple serversocket code in #2...
 But I am not seeing any text being printed on the console.

 Is public IterableString call(String arg0) throws Exception being called
 every 5 secs?

 The console log is in  http://pastebin.com/THzdzGhg
 http://pastebin.com/THzdzGhg



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Praveen,

I did not change the number of total executors. I specified 300 as the
number of executors when I submitted the jobs. However, for some stages,
the number of executors is very small, leading to long calculation time
even for small data set. That means not all executors were used for some
stages.

If I went to the detail of the running time of different executors, I found
some of them had very low running time while very few had very long running
time, leading to long overall running time. Another point I noticed is that
the number of completed tasks are usually larger than the number of total
tasks. That means sometimes the job is still running in some stages
although all the tasks have been finished. These are the too behavior I
observed that may related to the wrong running time.

Bill


On Thu, Jul 10, 2014 at 11:26 PM, Praveen Seluka psel...@qubole.com wrote:

 If I understand correctly, you could not change the number of executors at
 runtime right(correct me if am wrong) - its defined when we start the
 application and fixed. Do you mean number of tasks?


 On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I 
 checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there
 are sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill








Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread Akhil Das
Sorry, the command is

nc -lk 12345

Thanks
Best Regards


On Fri, Jul 11, 2014 at 6:46 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You simply use the *nc* command to do this. like:

 nc -p 12345

 will open the 12345 port and from the terminal you can provide whatever
 input you require for your StreamingCode.

 Thanks
 Best Regards


 On Fri, Jul 11, 2014 at 2:41 AM, kytay kaiyang@gmail.com wrote:

 Hi

 I am learning spark streaming, and is trying out the JavaNetworkCount
 example.


 #1 - This is the code I wrote
 JavaStreamingContext sctx = new JavaStreamingContext(local, appName, new
 Duration(5000));
 JavaReceiverInputDStreamString lines =
 sctx.socketTextStream(127.0.0.1,
 );

 JavaDStreamString words =lines.flatMap(
 new FlatMapFunctionString, String() {
 @Override
 public IterableString call(String arg0) throws
 Exception {

 System.out.println(Print text: + arg0);
 return Arrays.asList(arg0.split( ));
 }
 });

 #2  - This is the socketCode I am using
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.InputStreamReader;
 import java.net.ServerSocket;
 import java.net.Socket;

 public class TestTcpServer {

 public static void main(String argv[]) throws Exception
 {
 String clientSentence;
 String capitalizedSentence;
 ServerSocket welcomeSocket = new ServerSocket();

 int i = 0;

 while(true)
 {
 Socket connectionSocket = welcomeSocket.accept();
 BufferedReader inFromClient = new BufferedReader(
 new
 InputStreamReader(connectionSocket.getInputStream())
 );
 DataOutputStream outToClient = new
 DataOutputStream(connectionSocket.getOutputStream());

 while(true)
 {
 String sendingStr = Sending... data... 
 + i;
 outToClient.writeBytes(sendingStr);
 System.out.println(sendingStr);
 i++;
 Thread.sleep(3000);
 }
 }
 }
 }

 What I am trying to do is to get the JavaNetworkCount in #1 to start
 printing all the text I am receiving. But so far I failed to achieve that.

 I have been using  Hercules Setup
 http://www.hw-group.com/products/hercules/details_en.html   to
 simulate as
 a TCP server, as well as a simple serversocket code in #2...
 But I am not seeing any text being printed on the console.

 Is public IterableString call(String arg0) throws Exception being called
 every 5 secs?

 The console log is in  http://pastebin.com/THzdzGhg
 http://pastebin.com/THzdzGhg



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Dilip

Hi Akhil,

Can you please guide me through this? Because the code I am running 
already has this in it:

[java]

SparkContext sc = new SparkContext();
sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar);


Is there something I am missing?

Thanks,
Dilip

On Friday 11 July 2014 12:02 PM, Akhil Das wrote:
Easiest fix would be adding the kafka jars to the SparkContext while 
creating it.


Thanks
Best Regards


On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com 
mailto:dilip_ram...@hotmail.com wrote:


Hi,

I am trying to run a program with spark streaming using Kafka on a
stand alone system. These are my details:

Spark 1.0.0 hadoop2
Scala 2.10.3

I am trying a simple program using my custom sbt project but this
is the error I am getting:

Exception in thread main java.lang.NoClassDefFoundError:
kafka/serializer/StringDecoder
at

org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
at

org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
at
org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
at SimpleJavaApp.main(SimpleJavaApp.java:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
kafka.serializer.StringDecoder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 11 more


here is my .sbt file:

name := Simple Project

version := 1.0

scalaVersion := 2.10.3

libraryDependencies += org.apache.spark %% spark-core % 1.0.0

libraryDependencies += org.apache.spark %% spark-streaming %
1.0.0

libraryDependencies += org.apache.spark %% spark-sql % 1.0.0

libraryDependencies += org.apache.spark %% spark-examples %
1.0.0

libraryDependencies += org.apache.spark %
spark-streaming-kafka_2.10 % 1.0.0

libraryDependencies += org.apache.kafka %% kafka % 0.8.0

resolvers += Akka Repository at http://repo.akka.io/releases/;

resolvers += Maven Repository at http://central.maven.org/maven2/;


sbt package was successful. I also tried sbt ++2.10.3 package to
build it for my scala version. Problem remains the same.
Can anyone help me out here? Ive been stuck on this for quite some
time now.

Thank You,
Dilip






Re: Join two Spark Streaming

2014-07-11 Thread Bill Jay
Hi Tathagata,

Thanks for the solution. Actually, I will use the number of unique integers
in the batch instead of accumulative number of unique integers.

I do have two questions about your code:

1. Why do we need uniqueValuesRDD?  Why do we need to call
uniqueValuesRDD.checkpoint()?

2. Where is distinctValues defined?

Bill


On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Do you want to continuously maintain the set of unique integers seen since
 the beginning of stream?

 var uniqueValuesRDD: RDD[Int] = ...

 dstreamOfIntegers.transform(newDataRDD = {
val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
uniqueValuesRDD = newUniqueValuesRDD

// periodically call uniqueValuesRDD.checkpoint()

val uniqueCount = uniqueValuesRDD.count()
newDataRDD.map(x = x / count)
 })





 On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am working on a pipeline that needs to join two Spark streams. The
 input is a stream of integers. And the output is the number of integer's
 appearance divided by the total number of unique integers. Suppose the
 input is:

 1
 2
 3
 1
 2
 2

 There are 3 unique integers and 1 appears twice. Therefore, the output
 for the integer 1 will be:
 1 0.67

 Since the input is from a stream, it seems we need to first join the
 appearance of the integers and the total number of unique integers and then
 do a calculation using map. I am thinking of adding a dummy key to both
 streams and use join. However, a Cartesian product matches the application
 here better. How to do this effectively? Thanks!

 Bill





Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Bill Jay
I have met similar issues. The reason is probably because in Spark
assembly, spark-streaming-kafka is not included. Currently, I am using
Maven to generate a shaded package with all the dependencies. You may try
to use sbt assembly to include the dependencies in your jar file.

Bill


On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote:

  Hi Akhil,

 Can you please guide me through this? Because the code I am running
 already has this in it:
 [java]

 SparkContext sc = new SparkContext();

 sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar);


 Is there something I am missing?

 Thanks,
 Dilip


 On Friday 11 July 2014 12:02 PM, Akhil Das wrote:

  Easiest fix would be adding the kafka jars to the SparkContext while
 creating it.

  Thanks
 Best Regards


 On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote:

 Hi,

 I am trying to run a program with spark streaming using Kafka on a stand
 alone system. These are my details:

 Spark 1.0.0 hadoop2
 Scala 2.10.3

 I am trying a simple program using my custom sbt project but this is the
 error I am getting:

 Exception in thread main java.lang.NoClassDefFoundError:
 kafka/serializer/StringDecoder
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
 at
 org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
 at SimpleJavaApp.main(SimpleJavaApp.java:40)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 kafka.serializer.StringDecoder
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 11 more


 here is my .sbt file:

 name := Simple Project

 version := 1.0

 scalaVersion := 2.10.3

 libraryDependencies += org.apache.spark %% spark-core % 1.0.0

 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0

 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0

 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0

 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10
 % 1.0.0

 libraryDependencies += org.apache.kafka %% kafka % 0.8.0

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 resolvers += Maven Repository at http://central.maven.org/maven2/;


 sbt package was successful. I also tried sbt ++2.10.3 package to build
 it for my scala version. Problem remains the same.
 Can anyone help me out here? Ive been stuck on this for quite some time
 now.

 Thank You,
 Dilip






Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Nick Pentreath
You may look into the new Azkaban - which while being quite heavyweight is 
actually quite pleasant to use when set up.


You can run spark jobs (spark-submit) using azkaban shell commands and pass 
paremeters between jobs. It supports dependencies, simple dags and scheduling 
with retries. 




I'm digging deeper and it may be worthwhile extending it with a Spark job 
type...




It's probably best for mixed Hadoop / Spark clusters...
—
Sent from Mailbox

On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com
wrote:

 I used both - Oozie and Luigi - but found them inflexible and still
 overcomplicated, especially in presence of Spark.
 Oozie has a fixed list of building blocks, which is pretty limiting. For
 example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are
 out of scope (of course, you can always write wrapper as Java or Shell
 action, but does it really need to be so complicated?). Another issue with
 Oozie is passing variables between actions. There's Oozie context that is
 suitable for passing key-value pairs (both strings) between actions, but
 for more complex objects (say, FileInputStream that should be closed at
 last step only) you have to do some advanced kung fu.
 Luigi, on other hand, has its niche - complicated dataflows with many tasks
 that depend on each other. Basically, there are tasks (this is where you
 define computations) and targets (something that can exist - file on
 disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it
 creates a plan for achieving this. Luigi is really shiny when your workflow
 fits this model, but one step away and you are in trouble. For example,
 consider simple pipeline: run MR job and output temporary data, run another
 MR job and output final data, clean temporary data. You can make target
 Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1,
 right? Not so easy. How do you check that Clean task is achieved? If you
 just test whether temporary directory is empty or not, you catch both cases
 - when all tasks are done and when they are not even started yet. Luigi
 allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single
 run() method, but ruins the entire idea.
 And of course, both of these frameworks are optimized for standard
 MapReduce jobs, which is probably not what you want on Spark mailing list
 :)
 Experience with these frameworks, however, gave me some insights about
 typical data pipelines.
 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks
 allow branching, but most pipelines actually consist of moving data from
 source to destination with possibly some transformations in between (I'll
 be glad if somebody share use cases when you really need branching).
 2. Transactional logic is important. Either everything, or nothing.
 Otherwise it's really easy to get into inconsistent state.
 3. Extensibility is important. You never know what will need in a week or
 two.
 So eventually I decided that it is much easier to create your own pipeline
 instead of trying to adopt your code to existing frameworks. My latest
 pipeline incarnation simply consists of a list of steps that are started
 sequentially. Each step is a class with at least these methods:
  * run() - launch this step
  * fail() - what to do if step fails
  * finalize() - (optional) what to do when all steps are done
 For example, if you want to add possibility to run Spark jobs, you just
 create SparkStep and configure it with required code. If you want Hive
 query - just create HiveStep and configure it with Hive connection
 settings. I use YAML file to configure steps and Context (basically,
 Map[String, Any]) to pass variables between them. I also use configurable
 Reporter available for all steps to report the progress.
 Hopefully, this will give you some insights about best pipeline for your
 specific case.
 On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote:

 We use Luigi for this purpose.  (Our pipelines are typically on AWS (no
 EMR) backed by S3 and using combinations of Python jobs, non-Spark
 Java/Scala, and Spark.  We run Spark jobs by connecting drivers/clients to
 the master, and those are what is invoked from Luigi.)

 —
 p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


 On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote:

 I'm just wondering what's the general recommendation for data pipeline
 automation.

 Say, I want to run Spark Job A, then B, then invoke script C, then do D,
 and
 if D fails, do E, and if Job A fails, send email F, etc...

 It looks like Oozie might be the best choice. But I'd like some
 advice/suggestions.

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

I also tried to use the number of partitions as parameters to the functions
such as groupByKey. It seems the numbers of executors is around 50 instead
of 300, which is the number of the executors I specified in submission
script. Moreover, the running time of different executors is skewed. The
ideal case is that Spark can distribute the data into 300 executors evenly
so that the computation can be efficiently finished. I am not sure how to
achieve this.

Thanks!

Bill


On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there are
 sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill







Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread 明风
We use Azkaban for a short time and suffer a lot. Finally we almost rewrite
it totally. Don’t recommend it really.

发件人:  Nick Pentreath nick.pentre...@gmail.com
答复:  user@spark.apache.org
日期:  2014年7月11日 星期五 下午3:18
至:  user@spark.apache.org
主题:  Re: Recommended pipeline automation tool? Oozie?

You may look into the new Azkaban - which while being quite heavyweight is
actually quite pleasant to use when set up.

You can run spark jobs (spark-submit) using azkaban shell commands and pass
paremeters between jobs. It supports dependencies, simple dags and
scheduling with retries.

I'm digging deeper and it may be worthwhile extending it with a Spark job
type...

It's probably best for mixed Hadoop / Spark clusters...
―
Sent from Mailbox https://www.dropbox.com/mailbox


On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com wrote:
 I used both - Oozie and Luigi - but found them inflexible and still
 overcomplicated, especially in presence of Spark.
 
 Oozie has a fixed list of building blocks, which is pretty limiting. For
 example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out
 of scope (of course, you can always write wrapper as Java or Shell action, but
 does it really need to be so complicated?). Another issue with Oozie is
 passing variables between actions. There's Oozie context that is suitable for
 passing key-value pairs (both strings) between actions, but for more complex
 objects (say, FileInputStream that should be closed at last step only) you
 have to do some advanced kung fu.
 
 Luigi, on other hand, has its niche - complicated dataflows with many tasks
 that depend on each other. Basically, there are tasks (this is where you
 define computations) and targets (something that can exist - file on disk,
 entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates a
 plan for achieving this. Luigi is really shiny when your workflow fits this
 model, but one step away and you are in trouble. For example, consider simple
 pipeline: run MR job and output temporary data, run another MR job and output
 final data, clean temporary data. You can make target Clean, that depends on
 target MRJob2 that, in its turn, depends on MRJob1, right? Not so easy. How do
 you check that Clean task is achieved? If you just test whether temporary
 directory is empty or not, you catch both cases - when all tasks are done and
 when they are not even started yet. Luigi allows you to specify all 3 actions
 - MRJob1, MRJob2, Clean - in a single run() method, but ruins the entire
 idea. 
 
 And of course, both of these frameworks are optimized for standard MapReduce
 jobs, which is probably not what you want on Spark mailing list :)
 
 Experience with these frameworks, however, gave me some insights about typical
 data pipelines. 
 
 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks
 allow branching, but most pipelines actually consist of moving data from
 source to destination with possibly some transformations in between (I'll be
 glad if somebody share use cases when you really need branching).
 2. Transactional logic is important. Either everything, or nothing. Otherwise
 it's really easy to get into inconsistent state.
 3. Extensibility is important. You never know what will need in a week or two.
 
 So eventually I decided that it is much easier to create your own pipeline
 instead of trying to adopt your code to existing frameworks. My latest
 pipeline incarnation simply consists of a list of steps that are started
 sequentially. Each step is a class with at least these methods:
 
  * run() - launch this step
  * fail() - what to do if step fails
  * finalize() - (optional) what to do when all steps are done
 
 For example, if you want to add possibility to run Spark jobs, you just create
 SparkStep and configure it with required code. If you want Hive query - just
 create HiveStep and configure it with Hive connection settings. I use YAML
 file to configure steps and Context (basically, Map[String, Any]) to pass
 variables between them. I also use configurable Reporter available for all
 steps to report the progress.
 
 Hopefully, this will give you some insights about best pipeline for your
 specific case. 
 
 
 
 On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote:
 
 We use Luigi for this purpose.  (Our pipelines are typically on AWS (no EMR)
 backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and
 Spark.  We run Spark jobs by connecting drivers/clients to the master, and
 those are what is invoked from Luigi.)
 
 ―
 p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
 
 
 On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote:
 I'm just wondering what's the general recommendation for data pipeline
 automation.
 
 Say, I want to run Spark Job A, then B, then invoke script C, then do D, and
 if D fails, do E, and if Job A fails, send email F, etc...
 
 It looks like Oozie 

Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Nick Pentreath
Did you use old azkaban or azkaban 2.5? It has been completely rewritten.

Not saying it is the best but I found it way better than oozie for example.

Sent from my iPhone

 On 11 Jul 2014, at 09:24, 明风 mingf...@taobao.com wrote:
 
 We use Azkaban for a short time and suffer a lot. Finally we almost rewrite 
 it totally. Don’t recommend it really.
 
 发件人: Nick Pentreath nick.pentre...@gmail.com
 答复: user@spark.apache.org
 日期: 2014年7月11日 星期五 下午3:18
 至: user@spark.apache.org
 主题: Re: Recommended pipeline automation tool? Oozie?
 
 You may look into the new Azkaban - which while being quite heavyweight is 
 actually quite pleasant to use when set up.
 
 You can run spark jobs (spark-submit) using azkaban shell commands and pass 
 paremeters between jobs. It supports dependencies, simple dags and scheduling 
 with retries. 
 
 I'm digging deeper and it may be worthwhile extending it with a Spark job 
 type...
 
 It's probably best for mixed Hadoop / Spark clusters...
 —
 Sent from Mailbox
 
 
 On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com wrote:
 I used both - Oozie and Luigi - but found them inflexible and still 
 overcomplicated, especially in presence of Spark. 
 
 Oozie has a fixed list of building blocks, which is pretty limiting. For 
 example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out 
 of scope (of course, you can always write wrapper as Java or Shell action, 
 but does it really need to be so complicated?). Another issue with Oozie is 
 passing variables between actions. There's Oozie context that is suitable 
 for passing key-value pairs (both strings) between actions, but for more 
 complex objects (say, FileInputStream that should be closed at last step 
 only) you have to do some advanced kung fu. 
 
 Luigi, on other hand, has its niche - complicated dataflows with many tasks 
 that depend on each other. Basically, there are tasks (this is where you 
 define computations) and targets (something that can exist - file on disk, 
 entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates 
 a plan for achieving this. Luigi is really shiny when your workflow fits 
 this model, but one step away and you are in trouble. For example, consider 
 simple pipeline: run MR job and output temporary data, run another MR job 
 and output final data, clean temporary data. You can make target Clean, that 
 depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so 
 easy. How do you check that Clean task is achieved? If you just test whether 
 temporary directory is empty or not, you catch both cases - when all tasks 
 are done and when they are not even started yet. Luigi allows you to specify 
 all 3 actions - MRJob1, MRJob2, Clean - in a single run() method, but 
 ruins the entire idea. 
 
 And of course, both of these frameworks are optimized for standard MapReduce 
 jobs, which is probably not what you want on Spark mailing list :) 
 
 Experience with these frameworks, however, gave me some insights about 
 typical data pipelines. 
 
 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks 
 allow branching, but most pipelines actually consist of moving data from 
 source to destination with possibly some transformations in between (I'll be 
 glad if somebody share use cases when you really need branching). 
 2. Transactional logic is important. Either everything, or nothing. 
 Otherwise it's really easy to get into inconsistent state. 
 3. Extensibility is important. You never know what will need in a week or 
 two. 
 
 So eventually I decided that it is much easier to create your own pipeline 
 instead of trying to adopt your code to existing frameworks. My latest 
 pipeline incarnation simply consists of a list of steps that are started 
 sequentially. Each step is a class with at least these methods: 
 
  * run() - launch this step
  * fail() - what to do if step fails
  * finalize() - (optional) what to do when all steps are done
 
 For example, if you want to add possibility to run Spark jobs, you just 
 create SparkStep and configure it with required code. If you want Hive query 
 - just create HiveStep and configure it with Hive connection settings. I use 
 YAML file to configure steps and Context (basically, Map[String, Any]) to 
 pass variables between them. I also use configurable Reporter available for 
 all steps to report the progress. 
 
 Hopefully, this will give you some insights about best pipeline for your 
 specific case. 
 
 
 
 On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote:
 
 We use Luigi for this purpose.  (Our pipelines are typically on AWS (no 
 EMR) backed by S3 and using combinations of Python jobs, non-Spark 
 Java/Scala, and Spark.  We run Spark jobs by connecting drivers/clients to 
 the master, and those are what is invoked from Luigi.)
 
 —
 p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
 
 
 On Thu, Jul 10, 2014 at 10:20 AM, k.tham 

Re: KMeans code is rubbish

2014-07-11 Thread Wanda Hawk
I also took a look at 
spark-1.0.0/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
 and ran the code in a shell.

There is an issue here:
    val initMode = params.initializationMode match {
      case Random = KMeans.RANDOM
      case Parallel = KMeans.K_MEANS_PARALLEL
    }


If I use initMode=KMeans.RANDOM everything is ok.
If I use initMode=KMeans.K_MEANS_PARALLEL I get a wrong result. I do not know 
why. The example proposed is a really simple one that should not accept 
multiple solutions and always converge to the correct one.

Now what can be altered in the original SparkKMeans.scala (the seed or 
something else ?) to get the correct results each and every single time ?
On Thursday, July 10, 2014 7:58 PM, Xiangrui Meng men...@gmail.com wrote:
 


SparkKMeans is a naive implementation. Please use
mllib.clustering.KMeans in practice. I created a JIRA for this:
https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui


On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das
tathagata.das1...@gmail.com wrote:
 I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with your
 dataset as well, I got the expected answer. And I believe that even though
 initialization is done using sampling, the example actually sets the seed to
 a constant 42, so the result should always be the same no matter how many
 times you run it. So I am not really sure whats going on here.

 Can you tell us more about which version of Spark you are running? Which
 Java version?


 ==

 [tdas @ Xion spark2] cat input
 2 1
 1 2
 3 2
 2 3
 4 1
 5 1
 6 1
 4 2
 6 2
 4 3
 5 3
 6 3
 [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001
 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from
 SCDynamicStore
 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/07/10 02:45:07 WARN LoadSnappy:
 Snappy native library not loaded
 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS
 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS
 Finished iteration (delta = 3.0)
 Finished iteration (delta = 0.0)
 Final centers:
 DenseVector(5.0, 2.0)
 DenseVector(2.0, 2.0)



 On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk wanda_haw...@yahoo.com wrote:

 so this is what I am running:
 ./bin/run-example SparkKMeans
 ~/Documents/2dim2.txt 2 0.001

 And this is the input file:
 ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$
 └───#!cat ~/Documents/2dim2.txt
 2 1
 1 2
 3 2
 2 3
 4 1
 5 1
 6 1
 4 2
 6 2
 4 3
 5 3
 6 3
 

 This is the final output from spark:
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Getting 2 non-empty blocks
 out of 2 blocks
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Started 0 remote fetches in 0 ms
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 maxBytesInFlight: 50331648, targetRequestSize: 10066329
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Getting 2 non-empty blocks out of 2 blocks
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Started 0 remote fetches in 0 ms
 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is 1433
 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to driver
 14/07/10 20:05:12 INFO Executor: Finished task ID 14

 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0)
 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on
 localhost (progress: 1/2)
 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is 1433
 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to driver
 14/07/10 20:05:12 INFO Executor: Finished task ID 15
 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1)
 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on
 localhost (progress: 2/2)
 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at
 SparkKMeans.scala:75) finished in 0.008 s
 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose
 tasks
 have all completed, from pool
 14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at
 SparkKMeans.scala:75, took 0.02472681 s
 Finished iteration (delta = 0.0)
 Final centers:
 DenseVector(2.8571428571428568, 2.0)
 DenseVector(5.6005, 2.0)
 




 On Thursday, July 10, 2014 12:02 PM, Bertrand Dechoux decho...@gmail.com
 wrote:


 A picture is worth a thousand... Well, a picture with this dataset, what

 you are expecting and what you get, would help answering your initial
 question.

 Bertrand


 On Thu, Jul 10, 2014 at 10:44 AM, Wanda Hawk wanda_haw...@yahoo.com
 wrote:

 Can someone please run the standard kMeans code on this input with 2
 centers ?:
 2 1
 1 2
 3 2
 2 3
 4 1
 5 1
 6 1
 4 2
 6 2
 4 3
 5 3
 6 3

 The obvious result 

Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Dilip

A simple
sbt assembly
is not working. Is there any other way to include particular jars with 
assembly command?


Regards,
Dilip
On Friday 11 July 2014 12:45 PM, Bill Jay wrote:
I have met similar issues. The reason is probably because in Spark 
assembly, spark-streaming-kafka is not included. Currently, I am using 
Maven to generate a shaded package with all the dependencies. You may 
try to use sbt assembly to include the dependencies in your jar file.


Bill


On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com 
mailto:dilip_ram...@hotmail.com wrote:


Hi Akhil,

Can you please guide me through this? Because the code I am
running already has this in it:
[java]

SparkContext sc = new SparkContext();

sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar);


Is there something I am missing?

Thanks,
Dilip


On Friday 11 July 2014 12:02 PM, Akhil Das wrote:

Easiest fix would be adding the kafka jars to the SparkContext
while creating it.

Thanks
Best Regards


On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com
mailto:dilip_ram...@hotmail.com wrote:

Hi,

I am trying to run a program with spark streaming using Kafka
on a stand alone system. These are my details:

Spark 1.0.0 hadoop2
Scala 2.10.3

I am trying a simple program using my custom sbt project but
this is the error I am getting:

Exception in thread main java.lang.NoClassDefFoundError:
kafka/serializer/StringDecoder
at

org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
at

org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
at

org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
at SimpleJavaApp.main(SimpleJavaApp.java:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
kafka.serializer.StringDecoder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 11 more


here is my .sbt file:

name := Simple Project

version := 1.0

scalaVersion := 2.10.3

libraryDependencies += org.apache.spark %% spark-core %
1.0.0

libraryDependencies += org.apache.spark %%
spark-streaming % 1.0.0

libraryDependencies += org.apache.spark %% spark-sql %
1.0.0

libraryDependencies += org.apache.spark %% spark-examples
% 1.0.0

libraryDependencies += org.apache.spark %
spark-streaming-kafka_2.10 % 1.0.0

libraryDependencies += org.apache.kafka %% kafka % 0.8.0

resolvers += Akka Repository at http://repo.akka.io/releases/;

resolvers += Maven Repository at
http://central.maven.org/maven2/;


sbt package was successful. I also tried sbt ++2.10.3
package to build it for my scala version. Problem remains
the same.
Can anyone help me out here? Ive been stuck on this for quite
some time now.

Thank You,
Dilip









KMeans for large training data

2014-07-11 Thread durin
Hi,

I'm trying to use org.apache.spark.mllib.clustering.KMeans to do some basic
clustering with Strings.

My code works great when I use a five-figure amount of training elements.
However, with for example 2 million elements, it gets extremely slow. A
single stage may take up to 30 minutes.

From the Web UI, I can see that it does these three things repeatedly:


All of these tasks only use one executor, and on that executor only one
core. And I can see a scheduler delay of about 25 seconds.

I tried to use broadcast variables to speed this up, but maybe I'm using it
wrong. The relevant code (where it gets slow) is this:




What could I do to use more executors, and generally speed this up? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KMeans for large training data

2014-07-11 Thread Sean Owen
How many partitions do you use for your data? if the default is 1, you
probably need to manually ask for more partitions.

Also, I'd check that your executors aren't thrashing close to the GC
limit. This can make things start to get very slow.

On Fri, Jul 11, 2014 at 9:53 AM, durin m...@simon-schaefer.net wrote:
 Hi,

 I'm trying to use org.apache.spark.mllib.clustering.KMeans to do some basic
 clustering with Strings.

 My code works great when I use a five-figure amount of training elements.
 However, with for example 2 million elements, it gets extremely slow. A
 single stage may take up to 30 minutes.

 From the Web UI, I can see that it does these three things repeatedly:


 All of these tasks only use one executor, and on that executor only one
 core. And I can see a scheduler delay of about 25 seconds.

 I tried to use broadcast variables to speed this up, but maybe I'm using it
 wrong. The relevant code (where it gets slow) is this:




 What could I do to use more executors, and generally speed this up?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread kytay
Hi Akhil Das

I have tried the 
nc -lk  
command too.

I was hoping the System.out.println(Print text: + arg0); is printed when
a stream is processed when lines.flatMap(...) is called.

But from my test with nc -lk , nothing is printed on the console at
all.

==

To test out whether the nc tool is working, I have also test the nc tool
with the Hercules TCP client test tool, it works fine.

So now the question goes back to why

JavaDStreamString words =lines.flatMap(
new FlatMapFunctionString, String() {
@Override
public IterableString call(String arg0) throws 
Exception {

System.out.println(Print text: + arg0);
return Arrays.asList(arg0.split( ));
}
});

is not printing the text I am sending through nc -lk .

===

Is there any other way to test if socketTextStream(...) is working?

Regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9409.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread kytay
I think I should be seeing any line of text that I have typed in the nc
command.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread Akhil Das
Can you try this piece of code?

SparkConf sparkConf = new SparkConf().setAppName(JavaNetworkWordCount
);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(1000));

JavaReceiverInputDStreamString lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.
MEMORY_AND_DISK_SER);
JavaDStreamString words = lines.flatMap(new FlatMapFunctionString,
String()  {
  @Override
  public IterableString call(String x) {
return Lists.newArrayList(SPACE.split(x));
  }
});
JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
@Override
public Tuple2String, Integer call(String s) {
  return new Tuple2String, Integer(s, 1);
}
  }).reduceByKey(new Function2Integer, Integer, Integer() {
@Override
public Integer call(Integer i1, Integer i2) {
  return i1 + i2;
}
  });

wordCounts.print();
ssc.start();
ssc.awaitTermination();


Taken from
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java



Thanks
Best Regards


On Fri, Jul 11, 2014 at 9:58 AM, kytay kaiyang@gmail.com wrote:

 I think I should be seeing any line of text that I have typed in the nc
 command.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread M Singh
Hi TD:

The input file is on hdfs.  

The file is approx 2.7 GB and when the process starts, there are 11 tasks 
(since hdfs block size is 256M) for processing and 2 tasks for reduce by key.  
After the file has been processed, I see new stages with 2 tasks that continue 
to be generated. I understand this value (2) is the default value for 
spark.default.parallelism but don't quite understand how is the value 
determined for generating tasks for reduceByKey, how is it used besides 
reduceByKey and what should be the optimal value for this. 

Thanks.


On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


How are you supplying the text file? 



On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote:

Hi Folks:



I am working on an application which uses spark streaming (version 1.1.0 
snapshot on a standalone cluster) to process text file and save counters in 
cassandra based on fields in each row.  I am testing the application in two 
modes:  

   * Process each row and save the counter in cassandra.  In this scenario 
 after the text file has been consumed, there is no task/stages seen in the 
 spark UI.

   * If instead I use reduce by key before saving to cassandra, the spark 
 UI shows continuous generation of tasks/stages even after processing the file 
 has been completed. 

I believe this is because the reduce by key requires merging of data from 
different partitions.  But I was wondering if anyone has any insights/pointers 
for understanding this difference in behavior and how to avoid generating 
tasks/stages when there is no data (new file) available.


Thanks

Mans

RE: All of the tasks have been completed but the Stage is still shown as Active?

2014-07-11 Thread Haopu Wang
I saw some exceptions like this in driver log. Can you shed some lights? Is it 
related with the behaviour?

 

14/07/11 20:40:09 ERROR LiveListenerBus: Listener JobProgressListener threw an 
exception

java.util.NoSuchElementException: key not found: 64019

 at scala.collection.MapLike$class.default(MapLike.scala:228)

 at scala.collection.AbstractMap.default(Map.scala:58)

 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)

 at 
org.apache.spark.ui.jobs.JobProgressListener.onStageCompleted(JobProgressListener.scala:78)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)

 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at 
org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)

 at 
org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)

 at scala.Option.foreach(Option.scala:236)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)

 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)

 



From: Haopu Wang 
Sent: Thursday, July 10, 2014 7:38 PM
To: user@spark.apache.org
Subject: RE: All of the tasks have been completed but the Stage is still shown 
as Active?

 

I didn't keep the driver's log. It's a lesson.

I will try to run it again to see if it happens again.

 



From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: 2014年7月10日 17:29
To: user@spark.apache.org
Subject: Re: All of the tasks have been completed but the Stage is still shown 
as Active?

 

Do you see any errors in the logs of the driver?

 

On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang hw...@qilinsoft.com wrote:

I'm running an App for hours in a standalone cluster. From the data
injector and Streaming tab of web ui, it's running well.

However, I see quite a lot of Active stages in web ui even some of them
have all of their tasks completed.

I attach a screenshot for your reference.

Do you ever see this kind of behavior?

 



Iteration question

2014-07-11 Thread Nathan Kronenfeld
Hi, folks.

We're having a problem with iteration that I don't understand.

We have the following test code:

org.apache.log4j.Logger.getLogger(org).setLevel(org.apache.log4j.Level.WARN)
org.apache.log4j.Logger.getLogger(akka).setLevel(org.apache.log4j.Level.WARN)

def test (caching: Boolean, points: Int, iterations: Int) {
var coords = sc.parallelize(Array.fill(points)(0.0,
0.0).zipWithIndex.map(_.swap))
if (caching) coords.cache
coords.count

var iteration = 0
val times = new Array[Double](iterations)

do {
val start = System.currentTimeMillis
val thisIteration = iteration
val increments = sc.parallelize(for (i - 1 to points) yield (math.random,
math.random))
val newcoords = coords.zip(increments).map(p =
{
if (0 == p._1._1) println(Processing iteration +thisIteration)
(p._1._1,
 (p._1._2._1 + p._2._1,
  p._1._2._2 + p._2._2))
}
)
if (caching) newcoords.cache
newcoords.count
if (caching) coords.unpersist(false)
coords = newcoords
val end = System.currentTimeMillis

times(iteration) = (end-start)/1000.0
println(Done iteration +iteration+ in +times(iteration)+ seconds)
iteration = iteration + 1
} while (iteration  iterations)

for (i - 0 until iterations) {
println(Iteration +i+: +times(i))
}
}

If you run this on a local server with caching on and off, it appears that
the caching does what it is supposed to do - only the latest iteration is
processed each time through the loop.

However, despite this, the time for each iteration still gets slower and
slower.
For example, calling test(true, 5000, 100), I get the following times
(weeding out a few for brevity):
Iteration 0: 0.084
Iteration 10: 0.381
Iteration 20: 0.674
Iteration 30: 0.975
Iteration 40: 1.254
Iteration 50: 1.544
Iteration 60: 1.802
Iteration 70: 2.147
Iteration 80: 2.469
Iteration 90: 2.715
Iteration 99: 2.962

That's a 35x increase between the first and last iteration, when it should
be doing the same thing each time!

Without caching, the nubmers are
Iteration 0: 0.642
Iteration 10: 0.516
Iteration 20: 0.823
Iteration 30: 1.17
Iteration 40: 1.514
Iteration 50: 1.655
Iteration 60: 1.992
Iteration 70: 2.177
Iteration 80: 2.472
Iteration 90: 2.814
Iteration 99: 3.018

slightly slower - but not significantly.

Does anyone know, if the caching is working, why is iteration 100 slower
than iteration 1?  And why is caching making so little difference?


Thanks,
-Nathan Kronenfeld

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Categorical Features for K-Means Clustering

2014-07-11 Thread Wen Phan
Hi Folks,

Does any one have experience or recommendations on incorporating categorical 
features (attributes) into k-means clustering in Spark?  In other words, I want 
to cluster on a set of attributes that include categorical variables.

I know I could probably implement some custom code to parse and calculate my 
own similarity function, but I wanted to reach out before I did so.  I’d also 
prefer to take advantage of the k-means\parallel initialization feature of the 
model in MLlib, so an MLlib-based implementation would be preferred.

Thanks in advance.

Best,

-Wen


signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Categorical Features for K-Means Clustering

2014-07-11 Thread Sean Owen
Since you can't define your own distance function, you will need to
convert these to numeric dimensions. 1-of-n encoding can work OK,
depending on your use case. So a dimension that takes on 3 categorical
values, becomes 3 dimensions, of which all are 0 except one that has
value 1.

On Fri, Jul 11, 2014 at 3:07 PM, Wen Phan wen.p...@mac.com wrote:
 Hi Folks,

 Does any one have experience or recommendations on incorporating categorical 
 features (attributes) into k-means clustering in Spark?  In other words, I 
 want to cluster on a set of attributes that include categorical variables.

 I know I could probably implement some custom code to parse and calculate my 
 own similarity function, but I wanted to reach out before I did so.  I’d also 
 prefer to take advantage of the k-means\parallel initialization feature of 
 the model in MLlib, so an MLlib-based implementation would be preferred.

 Thanks in advance.

 Best,

 -Wen


Re: Categorical Features for K-Means Clustering

2014-07-11 Thread Wen Phan
I see.  So, basically, kind of like dummy variables like with regressions.  
Thanks, Sean.

On Jul 11, 2014, at 10:11 AM, Sean Owen so...@cloudera.com wrote:

 Since you can't define your own distance function, you will need to
 convert these to numeric dimensions. 1-of-n encoding can work OK,
 depending on your use case. So a dimension that takes on 3 categorical
 values, becomes 3 dimensions, of which all are 0 except one that has
 value 1.
 
 On Fri, Jul 11, 2014 at 3:07 PM, Wen Phan wen.p...@mac.com wrote:
 Hi Folks,
 
 Does any one have experience or recommendations on incorporating categorical 
 features (attributes) into k-means clustering in Spark?  In other words, I 
 want to cluster on a set of attributes that include categorical variables.
 
 I know I could probably implement some custom code to parse and calculate my 
 own similarity function, but I wanted to reach out before I did so.  I’d 
 also prefer to take advantage of the k-means\parallel initialization feature 
 of the model in MLlib, so an MLlib-based implementation would be preferred.
 
 Thanks in advance.
 
 Best,
 
 -Wen



signature.asc
Description: Message signed with OpenPGP using GPGMail


Spark Streaming timing considerations

2014-07-11 Thread Laeeq Ahmed
Hi,

In the spark streaming paper, slack time has been suggested for delaying the 
batch creation in case of external timestamps. I don't see any such option in 
streamingcontext. Is it available in the API?


Also going through the previous posts, queueStream has been suggested for this. 
I looked into to queueStream example.


     // Create and push some RDDs into Queue
    for (i - 1 to 30) {
    rddQueue += ssc.sparkContext.makeRDD(1 to 10)
    Thread.sleep(1000)
    }

The only thing I am unsure is how to make batches(basic RDD) out of stream 
coming on a port.

Regards,
Laeeq

Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.

2014-07-11 Thread Rohit Rai
Hi Gerard,

This was on my todos since long... i just published a Calliope snapshot
built against Hadoop 2.2.x, Take it for a spin if you get a chance -
You can get the jars from here -

   -
   
https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope_2.10/0.9.4-H2-SNAPSHOT/calliope_2.10-0.9.4-H2-SNAPSHOT.jar
   -
   
https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope-macros_2.10/0.9.4-H2-SNAPSHOT/calliope-macros_2.10-0.9.4-H2-SNAPSHOT.jar

Or to use from Maven -

dependency
  groupIdcom.tuplejump/groupId
  artifactIdcalliope_2.10/artifactId
  version0.9.4-H2-SNAPSHOT/version/dependency

and SBT -

libraryDependencies += com.tuplejump %% calliope_2.10 % 0.9.4-H2-SNAPSHOT


It passes all the tests so I am assuming all is fine, but we haven't tested
it very extensively.

Regards,
Rohit


*Founder  CEO, **Tuplejump, Inc.*

www.tuplejump.com
*The Data Engineering Platform*


On Fri, Jun 27, 2014 at 9:31 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi Rohit,

 Thanks for your message. We are currently on Spark 0.9.1, Cassandra 2.0.6
 and Calliope GA  (Would love to try the pre-release version if you want
 beta testers :-)   Our hadoop version is CDH4.4 and of course our spark
 assembly is compiled against it.

 We have got really interesting performance results from using Calliope and
 will probably try to compile it against Hadoop 2. Compared to the DataStax
 Java driver, out of the box, the Calliope lib gives us ~4.5x insert
 performance with a higher network and cpu usage (which is what we want in
 batch insert mode = fast)

 With additional code optimizations using the DataStax driver, we were able
 to reduce that gap to 2x but still Calliope was easier and faster to use.

 Will you be attending the Spark Summit? I'll be around.

 We'll be in touch in any case :-)

 -kr, Gerard.



 On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai ro...@tuplejump.com wrote:

 Hi Gerard,

 What is the version of Spark, Hadoop, Cassandra and Calliope are you
 using. We never built Calliope to Hadoop2 as we/or our clients don't use
 Hadoop in their deployments or use it only as the Infra component for Spark
 in which case H1/H2 doesn't make a difference for them.

 I know atleast of one case where the user had built Calliope against 2.0
 and was using it happily. If you need assistance with it we are here to
 help. Feel free to reach out to me directly and we can work out a solution
 for you.

 Regards,
 Rohit


 *Founder  CEO, **Tuplejump, Inc.*
 
 www.tuplejump.com
 *The Data Engineering Platform*


 On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Thanks Nick.

 We used the CassandraOutputFormat through Calliope. The Calliope API
 makes the CassandraOutputFormat quite accessible  and is cool to work with.
  It worked fine at prototype level, but we had Hadoop version conflicts
 when we put it in our Spark environment (Using our Spark assembly compiled
 with CDH4.4). The conflict seems to be at the Cassandra-all lib level,
 which is compiled against a different hadoop version  (v1).

 We could not get round that issue. (Any pointers in that direction?)

 That's why I'm trying the direct CQLSSTableWriter way but it looks
 blocked as well.

  -kr, Gerard.




 On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 can you not use a Cassandra OutputFormat? Seems they have
 BulkOutputFormat. An example of using it with Hadoop is here:
 http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html

 Using it with Spark will be similar to the examples:
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
 and
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala


 On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 (My excuses for the cross-post from SO)

 I'm trying to create Cassandra SSTables from the results of a batch
 computation in Spark. Ideally, each partition should create the SSTable 
 for
 the data it holds in order to parallelize the process as much as possible
 (and probably even stream it to the Cassandra ring as well)

 After the initial hurdles with the CQLSSTableWriter (like requiring
 the yaml file), I'm confronted now with this issue:





 java.lang.RuntimeException: Attempting to load already loaded column 
 family customer.rawts
 at org.apache.cassandra.config.Schema.load(Schema.java:347)
 at org.apache.cassandra.config.Schema.load(Schema.java:112)
 at 
 org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)

 I'm creating a writer on each parallel partition like this:





 def store(rdd:RDD[Message]) = {
 rdd.foreachPartition( msgIterator = {
   val writer = CQLSSTableWriter.builder()
  

Databricks demo

2014-07-11 Thread Debasish Das
Hi,

Databricks demo at spark summit was amazing...what's the frontend stack
used specifically for rendering multiple reactive charts on same dom? Looks
like that's an emerging pattern for correlating different data api...

Thanks
Deb


Re: KMeans code is rubbish

2014-07-11 Thread Ameet Talwalkar
Hi Wanda,

As Sean mentioned, K-means is not guaranteed to find an optimal answer,
even for seemingly simple toy examples. A common heuristic to deal with
this issue is to run kmeans multiple times and choose the best answer.  You
can do this by changing the runs parameter from the default value (1) to
something larger (say 10).

-Ameet


On Fri, Jul 11, 2014 at 1:20 AM, Wanda Hawk wanda_haw...@yahoo.com wrote:

 I also took a look
 at 
 spark-1.0.0/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
 and ran the code in a shell.

 There is an issue here:
 val initMode = params.initializationMode match {
   case Random = KMeans.RANDOM
   case Parallel = KMeans.K_MEANS_PARALLEL
 }
 

 If I use initMode=KMeans.RANDOM everything is ok.
 If I use initMode=KMeans.K_MEANS_PARALLEL I get a wrong result. I do not
 know why. The example proposed is a really simple one that should not
 accept multiple solutions and always converge to the correct one.

 Now what can be altered in the original SparkKMeans.scala (the seed or
 something else ?) to get the correct results each and every single time ?
On Thursday, July 10, 2014 7:58 PM, Xiangrui Meng men...@gmail.com
 wrote:


 SparkKMeans is a naive implementation. Please use
 mllib.clustering.KMeans in practice. I created a JIRA for this:
 https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui

 On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das
 tathagata.das1...@gmail.com wrote:
  I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with
 your
  dataset as well, I got the expected answer. And I believe that even
 though
  initialization is done using sampling, the example actually sets the
 seed to
  a constant 42, so the result should always be the same no matter how many
  times you run it. So I am not really sure whats going on here.
 
  Can you tell us more about which version of Spark you are running? Which
  Java version?
 
 
  ==
 
  [tdas @ Xion spark2] cat input
  2 1
  1 2
  3 2
  2 3
  4 1
  5 1
  6 1
  4 2
  6 2
  4 3
  5 3
  6 3
  [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001
  2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from
  SCDynamicStore
  14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
  14/07/10 02:45:07 WARN LoadSnappy: Snappy native library not loaded
  14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeSystemBLAS
  14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
  com.github.fommil.netlib.NativeRefBLAS
  Finished iteration (delta = 3.0)
  Finished iteration (delta = 0.0)
  Final centers:
  DenseVector(5.0, 2.0)
  DenseVector(2.0, 2.0)
 
 
 
  On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk wanda_haw...@yahoo.com
 wrote:
 
  so this is what I am running:
  ./bin/run-example SparkKMeans ~/Documents/2dim2.txt 2 0.001
 
  And this is the input file:
  ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$
  └───#!cat ~/Documents/2dim2.txt
  2 1
  1 2
  3 2
  2 3
  4 1
  5 1
  6 1
  4 2
  6 2
  4 3
  5 3
  6 3
  
 
  This is the final output from spark:
  14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
  Getting 2 non-empty blocks out of 2 blocks
  14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
  Started 0 remote fetches in 0 ms
  14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
  maxBytesInFlight: 50331648, targetRequestSize: 10066329
  14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
  Getting 2 non-empty blocks out of 2 blocks
  14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
  Started 0 remote fetches in 0 ms
  14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is
 1433
  14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to
 driver
  14/07/10 20:05:12 INFO Executor: Finished task ID 14
  14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0)
  14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on
  localhost (progress: 1/2)
  14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is
 1433
  14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to
 driver
  14/07/10 20:05:12 INFO Executor: Finished task ID 15
  14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1)
  14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on
  localhost (progress: 2/2)
  14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at
  SparkKMeans.scala:75) finished in 0.008 s
  14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose
 tasks
  have all completed, from pool
  14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at
  SparkKMeans.scala:75, took 0.02472681 s
  Finished iteration (delta = 0.0)
  Final centers:
  DenseVector(2.8571428571428568, 2.0)
  

Re: RDD join, index key: composite keys

2014-07-11 Thread marspoc
I want to do Index similar to RDBMS on keyPnl on the pnl_type_code so that
group by can be done efficitently. How do I  achieve that?
Currently below code blow out of memory in Spark on 60GB of data.
keyPnl is very large file. We have been stuck for 1 week. trying kryo,
mapvalue etc but without prevail.
We want to do partition on pnl_type_code but has no idea how to do that.
Please advice.


  val keyPnl = pnl.filter(_.rf_level == 0).keyBy(f=f.portfolio_code)
  val keyPosition = positions.filter(_.pl0_code == 3).keyBy(f =
f.portfolio_code)

  val JoinPnlPortfolio = keyPnl.leftOuterJoin(keyPosition)

  var result = JoinPnlPortfolio.groupBy(r = (r._2._1.pnl_type_code))
.mapValues(kv = (kv.map(mapper).fold (List[Double]())
(Vector.reduceVector _)))
.mapValues(kv = (Var.percentile(kv, 0.99)))




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-join-composite-keys-tp8696p9423.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Bill Jay
You may try to use this one:

https://github.com/sbt/sbt-assembly

I had an issue of duplicate files in the uber jar file. But I think this
library will assemble dependencies into a single jar file.

Bill


On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote:

  A simple
 sbt assembly
 is not working. Is there any other way to include particular jars with
 assembly command?

 Regards,
 Dilip

 On Friday 11 July 2014 12:45 PM, Bill Jay wrote:

 I have met similar issues. The reason is probably because in Spark
 assembly, spark-streaming-kafka is not included. Currently, I am using
 Maven to generate a shaded package with all the dependencies. You may try
 to use sbt assembly to include the dependencies in your jar file.

  Bill


 On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote:

  Hi Akhil,

 Can you please guide me through this? Because the code I am running
 already has this in it:
 [java]

 SparkContext sc = new SparkContext();

 sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar);


 Is there something I am missing?

 Thanks,
 Dilip


 On Friday 11 July 2014 12:02 PM, Akhil Das wrote:

  Easiest fix would be adding the kafka jars to the SparkContext while
 creating it.

  Thanks
 Best Regards


 On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote:

 Hi,

 I am trying to run a program with spark streaming using Kafka on a stand
 alone system. These are my details:

 Spark 1.0.0 hadoop2
 Scala 2.10.3

 I am trying a simple program using my custom sbt project but this is the
 error I am getting:

 Exception in thread main java.lang.NoClassDefFoundError:
 kafka/serializer/StringDecoder
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
 at
 org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
 at
 org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
 at SimpleJavaApp.main(SimpleJavaApp.java:40)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 kafka.serializer.StringDecoder
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 11 more


 here is my .sbt file:

 name := Simple Project

 version := 1.0

 scalaVersion := 2.10.3

 libraryDependencies += org.apache.spark %% spark-core % 1.0.0

 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0

 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0

 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0

 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10
 % 1.0.0

 libraryDependencies += org.apache.kafka %% kafka % 0.8.0

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 resolvers += Maven Repository at http://central.maven.org/maven2/;


 sbt package was successful. I also tried sbt ++2.10.3 package to build
 it for my scala version. Problem remains the same.
 Can anyone help me out here? Ive been stuck on this for quite some time
 now.

 Thank You,
 Dilip








RE: SPARK_CLASSPATH Warning

2014-07-11 Thread Andrew Lee
As mentioned, deprecated in Spark 1.0+.
Try to use the --driver-class-path:
 ./bin/spark-shell --driver-class-path yourlib.jar:abc.jar:xyz.jar

Don't use glob *, specify the JAR one by one with colon.

Date: Wed, 9 Jul 2014 13:45:07 -0700
From: kat...@cs.pitt.edu
Subject: SPARK_CLASSPATH Warning
To: user@spark.apache.org

Hello,

I have installed Apache Spark v1.0.0 in a machine with a proprietary Hadoop 
Distribution installed (v2.2.0 without yarn). Due to the fact that the Hadoop 
Distribution that I am using, uses a list of jars , I do the following changes 
to the conf/spark-env.sh


#!/usr/bin/env bash

export HADOOP_CONF_DIR=/path-to-hadoop-conf/hadoop-conf
export SPARK_LOCAL_IP=impl41
export 
SPARK_CLASSPATH=/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/*

...

Also, to make sure that I have everything working I execute the Spark shell as 
follows:

[biadmin@impl41 spark]$ ./bin/spark-shell --jars 
/path-to-proprietary-hadoop-lib/lib/*.jar


14/07/09 13:37:28 INFO spark.SecurityManager: Changing view acls to: biadmin
14/07/09 13:37:28 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(biadmin)

14/07/09 13:37:28 INFO spark.HttpServer: Starting HTTP Server
14/07/09 13:37:29 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/09 13:37:29 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:44292

Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.0.0
  /_/

Using Scala version 2.10.4 (IBM J9 VM, Java 1.7.0)

Type in expressions to have them evaluated.
Type :help for more information.
14/07/09 13:37:36 WARN spark.SparkConf: 
SPARK_CLASSPATH was detected (set to 
'path-to-proprietary-hadoop-lib/*:/path-to-proprietary-hadoop-lib/lib/*').

This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath


14/07/09 13:37:36 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' 
to '/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/*' as 
a work-around.
14/07/09 13:37:36 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' 
to '/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/*' as 
a work-around.

14/07/09 13:37:36 INFO spark.SecurityManager: Changing view acls to: biadmin
14/07/09 13:37:36 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(biadmin)

14/07/09 13:37:37 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/07/09 13:37:37 INFO Remoting: Starting remoting
14/07/09 13:37:37 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://spark@impl41:46081]

14/07/09 13:37:37 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://spark@impl41:46081]
14/07/09 13:37:37 INFO spark.SparkEnv: Registering MapOutputTracker
14/07/09 13:37:37 INFO spark.SparkEnv: Registering BlockManagerMaster

14/07/09 13:37:37 INFO storage.DiskBlockManager: Created local directory at 
/tmp/spark-local-20140709133737-798b
14/07/09 13:37:37 INFO storage.MemoryStore: MemoryStore started with capacity 
307.2 MB.
14/07/09 13:37:38 INFO network.ConnectionManager: Bound socket to port 16685 
with id = ConnectionManagerId(impl41,16685)

14/07/09 13:37:38 INFO storage.BlockManagerMaster: Trying to register 
BlockManager
14/07/09 13:37:38 INFO storage.BlockManagerInfo: Registering block manager 
impl41:16685 with 307.2 MB RAM
14/07/09 13:37:38 INFO storage.BlockManagerMaster: Registered BlockManager

14/07/09 13:37:38 INFO spark.HttpServer: Starting HTTP Server
14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/09 13:37:38 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:21938

14/07/09 13:37:38 INFO broadcast.HttpBroadcast: Broadcast server started at 
http://impl41:21938
14/07/09 13:37:38 INFO spark.HttpFileServer: HTTP File server directory is 
/tmp/spark-91e8e040-f2ca-43dd-b574-805033f476c7

14/07/09 13:37:38 INFO spark.HttpServer: Starting HTTP Server
14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/09 13:37:38 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:52678

14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/09 13:37:38 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:4040
14/07/09 13:37:38 INFO ui.SparkUI: Started SparkUI at http://impl41:4040

14/07/09 13:37:39 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
14/07/09 13:37:39 INFO spark.SparkContext: Added JAR 
file:/opt/ibm/biginsights/IHC/lib/adaptive-mr.jar at 
http://impl41:52678/jars/adaptive-mr.jar with timestamp 1404938259526

14/07/09 13:37:39 INFO executor.Executor: Using REPL class URI: 
http://impl41:44292
14/07/09 

Re: Join two Spark Streaming

2014-07-11 Thread Tathagata Das
1. Since the RDD of the previous batch is used to create the RDD of the
next batch, the lineage of dependencies in the RDDs continues to grow
infinitely. Thats not good because of it increases fault-recover times,
task sizes, etc. Checkpointing saves the data of an RDD to HDFS and
truncates the lineage.


2. The code should have been the following. Sorry about the confusion.

var uniqueValuesRDD: RDD[Int] = ...

dstreamOfIntegers.transform(newDataRDD = {
   val newUniqueValuesRDD  = newDataRDD.union(*uniqueValuesRDD*).distinct
   uniqueValuesRDD = newUniqueValuesRDD

   // periodically call uniqueValuesRDD.checkpoint()

   val uniqueCount = uniqueValuesRDD.count()
   newDataRDD.map(x = x / count)
})




On Fri, Jul 11, 2014 at 12:10 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 Thanks for the solution. Actually, I will use the number of unique
 integers in the batch instead of accumulative number of unique integers.

 I do have two questions about your code:

 1. Why do we need uniqueValuesRDD?  Why do we need to call
 uniqueValuesRDD.checkpoint()?

 2. Where is distinctValues defined?

 Bill


 On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Do you want to continuously maintain the set of unique integers seen
 since the beginning of stream?

 var uniqueValuesRDD: RDD[Int] = ...

 dstreamOfIntegers.transform(newDataRDD = {
val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
uniqueValuesRDD = newUniqueValuesRDD

// periodically call uniqueValuesRDD.checkpoint()

val uniqueCount = uniqueValuesRDD.count()
newDataRDD.map(x = x / count)
 })





 On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am working on a pipeline that needs to join two Spark streams. The
 input is a stream of integers. And the output is the number of integer's
 appearance divided by the total number of unique integers. Suppose the
 input is:

 1
 2
 3
 1
 2
 2

 There are 3 unique integers and 1 appears twice. Therefore, the output
 for the integer 1 will be:
 1 0.67

 Since the input is from a stream, it seems we need to first join the
 appearance of the integers and the total number of unique integers and then
 do a calculation using map. I am thinking of adding a dummy key to both
 streams and use join. However, a Cartesian product matches the application
 here better. How to do this effectively? Thanks!

 Bill






RE: spark-1.0.0-rc11 2f1dc868 spark-shell not honoring --properties-file option?

2014-07-11 Thread Andrew Lee
Ok, I found it on JIRA SPARK-2390:
https://issues.apache.org/jira/browse/SPARK-2390
So it looks like this is a known issue.

From: alee...@hotmail.com
To: user@spark.apache.org
Subject: spark-1.0.0-rc11 2f1dc868 spark-shell not honoring --properties-file 
option?
Date: Tue, 8 Jul 2014 15:17:00 -0700




Build: Spark 1.0.0 rc11 (git commit tag: 
2f1dc868e5714882cf40d2633fb66772baf34789)








Hi All,
When I enabled the spark-defaults.conf for the eventLog, spark-shell broke 
while spark-submit works.
I'm trying to create a separate directory per user to keep track with their own 
Spark job event logs with the env $USER in spark-defaults.conf.
Here's the spark-defaults.conf I specified so that HistoryServer can start 
picking up these event log from HDFS.As you can see here, I was trying to 
create a directory for each user so they can store the event log on a per user 
base.However, when I launch spark-shell, it didn't pick up $USER as the current 
login user. However, this works for spark-submit.
Here's more details.
/opt/spark/ is SPARK_HOME








[test@ ~]$ cat /opt/spark/conf/spark-defaults.conf
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.


# Example:
# spark.masterspark://master:7077
spark.eventLog.enabledtrue
spark.eventLog.dirhdfs:///user/$USER/spark/logs/
# spark.serializerorg.apache.spark.serializer.KryoSerializer

and I tried to create a separate config file to override the default one:







[test@ ~]$ SPARK_SUBMIT_OPTS=-XX:MaxPermSize=256m /opt/spark/bin/spark-shell 
--master yarn --driver-class-path 
/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar --properties-file 
/home/test/spark-defaults.conf [test@~]$ cat /home/test/spark-defaults.conf# 
Default system properties included when running spark-submit.# This is useful 
for setting default environmental settings.
# Example:# spark.masterspark://master:7077spark.eventLog.enabled   
 truespark.eventLog.dirhdfs:///user/test/spark/logs/















# spark.serializerorg.apache.spark.serializer.KryoSerializer
But it didn't work also, it is still looking at the 
/opt/spark/conf/spark-defaults.conf. According to the document, 
http://spark.apache.org/docs/latest/configuration.htmlHardcoded properties in 
SparkConf  spark-submit / spark-shell  conf/spark-defaults.conf
2 problems here:
1. In repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala, the instance 
SparkConf didn't look for the user specified spark-defaults.conf anywhere.
I don't see anywhere that pulls in the file from option --properties-file, it 
is just the default location conf/spark-defaults.confval conf = new SparkConf() 
 .setMaster(getMaster())  .setAppName(Spark shell)  .setJars(jars)











  .set(spark.repl.class.uri, intp.classServer.uri)
2. The $USER isn't picked up in spark-shell. This may be another problem and 
fixed at the same time when it re-use how SparkSubmit.scala does to SparkConf???











  

Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread Tathagata Das
Whenever you need to do a shuffle=based operation like reduceByKey,
groupByKey, join, etc., the system is essentially redistributing the data
across the cluster and it needs to know how many parts should it divide the
data into. Thats where the default parallelism is used.

TD


On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote:

 Hi TD:

 The input file is on hdfs.

  The file is approx 2.7 GB and when the process starts, there are 11
 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce
 by key.  After the file has been processed, I see new stages with 2 tasks
 that continue to be generated. I understand this value (2) is the default
 value for spark.default.parallelism but don't quite understand how is the
 value determined for generating tasks for reduceByKey, how is it used
 besides reduceByKey and what should be the optimal value for this.

  Thanks.


   On Thursday, July 10, 2014 7:24 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:


 How are you supplying the text file?


 On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote:

 Hi Folks:

 I am working on an application which uses spark streaming (version 1.1.0
 snapshot on a standalone cluster) to process text file and save counters in
 cassandra based on fields in each row.  I am testing the application in two
 modes:

- Process each row and save the counter in cassandra.  In this
scenario after the text file has been consumed, there is no task/stages
seen in the spark UI.
- If instead I use reduce by key before saving to cassandra, the spark
UI shows continuous generation of tasks/stages even after processing the
file has been completed.

 I believe this is because the reduce by key requires merging of data from
 different partitions.  But I was wondering if anyone has any
 insights/pointers for understanding this difference in behavior and how to
 avoid generating tasks/stages when there is no data (new file) available.

 Thanks

 Mans







Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Yan Fang
Hi Praveen,

Thank you for the answer. That's interesting because if I only bring up one
executor for the Spark Streaming, it seems only the receiver is working, no
other tasks are happening, by checking the log and UI. Maybe it's just
because the receiving task eats all the resource?, not because one executor
can only run one receiver?

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote:

 Here are my answers. But am just getting started with Spark Streaming - so
 please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be used
 for map/reduce tasks. In yarn-cluster mode, the driver program is actually
 run as application master (lives in the first container thats launched) and
 this is not an executor - hence its not used for other operations.
 4) the driver runs in a separate container. I think the same executor can
 be used for receiver and the processing task also (this part am not very
 sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work machine
 (*n**ote that each input DStream creates a single receiver (running on
 a worker machine) that receives a single stream of data*). Does the
 work machine mean the executor or physical machine? If I have more
 receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other operations,
 such as map, reduce, or fully occupied by the receiver? Similarly, if I run
 in yarn-cluster mode, is the executor running driver program used by other
 operations too?

 4. So if I have a driver program (cluster mode) and streaming receiver,
 do I have to have at least 2 executors because the program and streaming
 receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to understand
 how the Spark Streaming distributes in order to assign reasonable
 recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108





Re: writing FLume data to HDFS

2014-07-11 Thread Tathagata Das
What is the error you are getting when you say ??I was trying to write the
data to hdfs..but it fails…

TD


On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. 
muthu.x.sundaram@sabre.com wrote:

 I am new to spark. I am trying to do the following.

 NetcatàFlumeàSpark streaming(process Flume Data)àHDFS.



 My flume config file has following set up.



 Source = netcat

 Sink=avrosink.



 Spark Streaming code:

 I am able to print data from flume to the monitor. But I am struggling to
 create a file. In order to get the real data I need to convert SparkEvent
 to avroEvent.

 JavaRDD.saveAsText()àmight not work..because JavaRDD is collection of
 SparkEvent..Do I need to convert this in to collection of
 JavaRDDAvroEvent?

 Please share any code examples… Thanks.



 Code:



  Duration batchInterval = new Duration(2000);

 SparkConf sparkConf = new
 SparkConf().setAppName(JavaFlumeEventCount);

 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
 batchInterval);

 JavaDStreamSparkFlumeEvent flumeStream =
 FlumeUtils.createStream(ssc, host, port);



 flumeStream.count();

 flumeStream.foreachRDD(new
 Function2JavaRDDSparkFlumeEvent,JavaRDDSparkFlumeEvent,Void(){

  @Override

  public Void call(JavaRDDSparkFlumeEvent
 events1,JavaRDDSparkFlumeEvent events2) throws Exception{

 events1.saveasTextFile(output.txt);

 return null;

  }

  });



 /*flumeStream.count().map(new FunctionLong, String() {

   @Override

   public String call(Long in) {

 return Received  + in +  flume events.;

   }

 }).print();*/



 flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () {

   @Override

   public Void call(JavaRDDSparkFlumeEvent eventsData) throws
 Exception {

  String logRecord = null;

  ListSparkFlumeEvent events = eventsData.collect();

  IteratorSparkFlumeEvent batchedEvents =
 events.iterator();





  long t1 = System.currentTimeMillis();

  AvroFlumeEvent avroEvent = null;

  ByteBuffer bytePayload = null;




  // All the user level data is carried as payload in
 Flume Event



  while(batchedEvents.hasNext()) {

 SparkFlumeEvent flumeEvent =
 batchedEvents.next();

 avroEvent = flumeEvent.event();

 bytePayload = avroEvent.getBody();

 logRecord = new String(bytePayload.array());




 System.out.println(LOG RECORD =  +
 logRecord);



??I was trying to write the data to hdfs..but
 it fails…







  }

  System.out.println(Processed this batch in:  +
 (System.currentTimeMillis() - t1)/1000 +  seconds);

  return null;

   }

  });





Re: Spark Streaming RDD to Shark table

2014-07-11 Thread patwhite
Hi,
I'm running into an identical issue running Spark 1.0.0 on Mesos 0.19. Were
you able to get it sorted? There's no real documentation for the
spark.httpBroadcast.uri except what's in the code - is this config setting
required for running on a Mesos cluster?

I'm running this in a dev environment with a simple 2 machine setup - the
driver is running on dev-1, and dev-2 (10.0.0.5 in the below stack trace)
has a mesos master, zookeeper, and mesos slave.  

Stack Trace:

14/07/11 18:00:05 INFO SparkEnv: Connecting to MapOutputTracker:
akka.tcp://spark@dev-1:58136/user/MapOutputTracker
14/07/11 18:00:06 INFO SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://spark@dev-1:58136/user/BlockManagerMaster
14/07/11 18:00:06 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140711180006-dea8
14/07/11 18:00:06 INFO MemoryStore: MemoryStore started with capacity 589.2
MB.
14/07/11 18:00:06 INFO ConnectionManager: Bound socket to port 60708 with id
= ConnectionManagerId(10.0.0.5,60708)
14/07/11 18:00:06 INFO BlockManagerMaster: Trying to register BlockManager
14/07/11 18:00:06 INFO BlockManagerMaster: Registered BlockManager
java.util.NoSuchElementException: spark.httpBroadcast.uri
at
org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
at
org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at org.apache.spark.SparkConf.get(SparkConf.scala:149)
at
org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:130)
at
org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at
org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at
org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.executor.Executor.init(Executor.scala:85)
at
org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:56)
Exception in thread Thread-2 I0711 18:00:06.454962 14037 exec.cpp:412]
Deactivating the executor libprocess

If I manually set the httpBroadcastUri to http://dev-1; I get the following
error, I assume because I'm not setting the port correctly (which I don't
think I have any way of knowing?)

14/07/11 18:31:27 ERROR Executor: Exception in task ID 4
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.init(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:996)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:932)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:850)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1300)
at
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196)
at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at

Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread M Singh
So, is it expected for the process to generate stages/tasks even after 
processing a file ?

Also, is there a way to figure out the file that is getting processed and when 
that process is complete ?

Thanks



On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, 
join, etc., the system is essentially redistributing the data across the 
cluster and it needs to know how many parts should it divide the data into. 
Thats where the default parallelism is used. 

TD



On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote:

Hi TD:


The input file is on hdfs.  


The file is approx 2.7 GB and when the process starts, there are 11 tasks 
(since hdfs block size is 256M) for processing and 2 tasks for reduce by key.  
After the file has been processed, I see new stages with 2 tasks that continue 
to be generated. I understand this value (2) is the default value for 
spark.default.parallelism but don't quite understand how is the value 
determined for generating tasks for reduceByKey, how is it used besides 
reduceByKey and what should be the optimal value for this. 


Thanks.



On Thursday, July 10, 2014 7:24 PM, Tathagata Das 
tathagata.das1...@gmail.com wrote:
 


How are you supplying the text file? 



On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote:

Hi Folks:



I am working on an application which uses spark streaming (version 1.1.0 
snapshot on a standalone cluster) to process text file and save counters in 
cassandra based on fields in each row.  I am testing the application in two 
modes:  

  * Process each row and save the counter in cassandra.  In this scenario 
 after the text file has been consumed, there is no task/stages seen in the 
 spark UI.

  * If instead I use reduce by key before saving to cassandra, the spark 
 UI shows continuous generation of tasks/stages even after processing the 
 file has been completed. 

I believe this is because the reduce by key requires merging of data from 
different partitions.  But I was wondering if anyone has any 
insights/pointers for understanding this difference in behavior and how to 
avoid generating tasks/stages when there is no data (new file) available.


Thanks

Mans




Top N predictions

2014-07-11 Thread Rich Kroll
Hello all,
In our use case we would like to return top 10 predicted values. I've
looked at NaiveBayes  LogisticRegressionModel and cannot seem to find a
way to get the predicted values for a vector - is this possible with
mllib/spark?

Thanks,
Rich


Re: Top N predictions

2014-07-11 Thread Sean Owen
I don't believe it is. Recently when I needed to do this, I just
copied out the underlying probability / margin function and calculated
it from the model params. It's just a dot product.

On Fri, Jul 11, 2014 at 7:48 PM, Rich Kroll
rich.kr...@modernizingmedicine.com wrote:
 Hello all,
 In our use case we would like to return top 10 predicted values. I've looked
 at NaiveBayes  LogisticRegressionModel and cannot seem to find a way to get
 the predicted values for a vector - is this possible with mllib/spark?

 Thanks,
 Rich



Job getting killed

2014-07-11 Thread Srikrishna S
I am trying to run Logistic Regression on the url dataset (from
libsvm) using the exact same code
as the example on a 5 node Yarn-Cluster.

I get a pretty cryptic error that says

Killed

Nothing more

Settings:

  --master yarn-client
  --verbose
  --driver-memory 24G
  --executor-memory 24G
  --executor-cores 8
  --num-executors 5

I set the akka.frame_size to 200MB.


Script:

ef main(args: Array[String]) {

val conf = new SparkConf()
 .setMaster(yarn-client)
 .setAppName(Logistic regression SGD fixed)
 .set(spark.akka.frameSize, 200)
var sc = new SparkContext(conf)

// Load and parse the data
val dataset = args(0)
val maxIterations = 100
val start_time = System.nanoTime()
val data = MLUtils.loadLibSVMFile(sc, dataset)

// Building the model
var solver = new LogisticRegressionWithSGD()
solver.optimizer.setNumIterations(maxIterations)
solver.optimizer.setRegParam(0.01)
val model = solver.run(data)

   // Measure the accuracy. Don't measure the time taken to do this.
   val preditionsAndLabels = data.map { point =
 val prediction = model.predict(point.features)
 (prediction, point.label)
   }

   val accuracy = (preditionsAndLabels.filter(r = r._1 ==
r._2).count.toDouble) / data.count
   val elapsed_time = (System.nanoTime() - start_time) / 1e9

   // User the last known accuracy
   println(dataset + ,spark-sgd, + maxIterations + ,  +
elapsed_time + , + accuracy)
   System.exit(0)
  }


MLlib feature request

2014-07-11 Thread Joseph Feng
Hi all,

My company is actively using spark machine learning library, and we would love 
to see Gradient Boosting Machine algorithm (and perhaps Adaboost algorithm as 
well) being implemented. I’d greatly appreciate it if anyone could help to move 
it forward or to elevate this request.

Thanks,
Joseph


Re: KMeans for large training data

2014-07-11 Thread Sean Owen
On Fri, Jul 11, 2014 at 7:32 PM, durin m...@simon-schaefer.net wrote:
 How would you get more partitions?

You can specify this as the second arg to methods that read your data
originally, like:
sc.textFile(..., 20)

 I ran broadcastVector.value.repartition(5), but
 broadcastVector.value.partitions.size is still 1 and no change to the
 behavior is visible.

These are immutable, so to have effect you have to do something like:
val repartitioned = broadcastVector.value.repartition(5)


 First of all, there is a gap of almost two minutes between the third to last
 and second to last line, where no activity is shown in the WebUI. Is that
 the GC at work? If yes, how would I improve this?

You mean there are a few minutes where no job is running? I assume
that's time when the driver is busy doing something. Is it thrashing?


 Also, Local KMeans++ reached the max number of iterations: 30 surprises
 me. I have ran training using

 is it possible that somehow, there are still 30 iterations executed, despite
 of the 3 I set?

Are you sure you set 3 iterations?


Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Wei Tan
Just curious: how about using scala to drive the workflow? I guess if you 
use other tools (oozie, etc) you lose the advantage of reading from RDD -- 
you have to read from HDFS.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   k.tham kevins...@gmail.com
To: u...@spark.incubator.apache.org, 
Date:   07/10/2014 01:20 PM
Subject:Recommended pipeline automation tool? Oozie?



I'm just wondering what's the general recommendation for data pipeline
automation.

Say, I want to run Spark Job A, then B, then invoke script C, then do D, 
and
if D fails, do E, and if Job A fails, send email F, etc...

It looks like Oozie might be the best choice. But I'd like some
advice/suggestions.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: MLlib feature request

2014-07-11 Thread Ameet Talwalkar
Hi Joseph,

Thanks for your email.  Many users are requesting this functionality, while
it would be a stretch for them to appear in Spark 1.1, various people
(including Manish Amde and folks at the AMPLab, Databricks and Alpine Labs)
are actively work on developing ensembles of decision trees (random
forests, boosting).

-Ameet


On Fri, Jul 11, 2014 at 12:04 PM, Joseph Feng joseph.f...@creditkarma.com
wrote:

  Hi all,

  My company is actively using spark machine learning library, and we
 would love to see Gradient Boosting Machine algorithm (and perhaps Adaboost
 algorithm as well) being implemented. I’d greatly appreciate it if anyone
 could help to move it forward or to elevate this request.

  Thanks,
 Joseph



not getting output from socket connection

2014-07-11 Thread Walrus theCat
Hi,

I have a java application that is outputting a string every second.  I'm
running the wordcount example that comes with Spark 1.0, and running nc -lk
. When I type words into the terminal running netcat, I get counts.
However, when I write the String onto a socket on port , I don't get
counts.  I can see the strings showing up in the netcat terminal, but no
counts from Spark.  If I paste in the string, I get counts.

Any ideas?

Thanks


Re: not getting output from socket connection

2014-07-11 Thread Walrus theCat
I forgot to add that I get the same behavior if I tail -f | nc localhost
 on a log file.


On Fri, Jul 11, 2014 at 1:25 PM, Walrus theCat walrusthe...@gmail.com
wrote:

 Hi,

 I have a java application that is outputting a string every second.  I'm
 running the wordcount example that comes with Spark 1.0, and running nc -lk
 . When I type words into the terminal running netcat, I get counts.
 However, when I write the String onto a socket on port , I don't get
 counts.  I can see the strings showing up in the netcat terminal, but no
 counts from Spark.  If I paste in the string, I get counts.

 Any ideas?

 Thanks



Re: not getting output from socket connection

2014-07-11 Thread Sean Owen
netcat is listening for a connection on port . It is echoing what
you type to its console to anything that connects to  and reads.
That is what Spark streaming does.

If you yourself connect to  and write, nothing happens except that
netcat echoes it. This does not cause Spark to somehow get that data.
nc is only echoing input from the console.

On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com wrote:
 Hi,

 I have a java application that is outputting a string every second.  I'm
 running the wordcount example that comes with Spark 1.0, and running nc -lk
 . When I type words into the terminal running netcat, I get counts.
 However, when I write the String onto a socket on port , I don't get
 counts.  I can see the strings showing up in the netcat terminal, but no
 counts from Spark.  If I paste in the string, I get counts.

 Any ideas?

 Thanks


Spark Questions

2014-07-11 Thread Gonzalo Zarza
Hi all,

We've been evaluating Spark for a long-term project. Although we've been
reading several topics in forum, any hints on the following topics we'll be
extremely welcomed:

1. Which are the data partition strategies available in Spark? How
configurable are these strategies?

2. How would be the best way to use Spark if queries can touch only 3-5
entries/records? Which strategy is the best if they want to perform a full
scan of the entries?

3. Is Spark capable of interacting with RDBMS?

Thanks a lot!

Best regards,

--
*Gonzalo Zarza* | PhD in High-Performance Computing | Big-Data Specialist |
*GLOBANT* | AR: +54 11 4109 1700 ext. 15494 | US: +1 877 215 5230 ext. 15494
 | [image: Facebook] https://www.facebook.com/Globant [image: Twitter]
http://www.twitter.com/globant [image: Youtube]
http://www.youtube.com/Globant [image: Linkedin]
http://www.linkedin.com/company/globant [image: Pinterest]
http://pinterest.com/globant/ [image: Globant] http://www.globant.com/


Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread Tathagata Das
The model for file stream is to pick up and process new files written
atomically (by move) into a directory. So your file is being processed in a
single batch, and then its waiting for any new files to be written into
that directory.

TD


On Fri, Jul 11, 2014 at 11:46 AM, M Singh mans6si...@yahoo.com wrote:

 So, is it expected for the process to generate stages/tasks even after
 processing a file ?

 Also, is there a way to figure out the file that is getting processed and
 when that process is complete ?

 Thanks


   On Friday, July 11, 2014 1:51 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:


 Whenever you need to do a shuffle=based operation like reduceByKey,
 groupByKey, join, etc., the system is essentially redistributing the data
 across the cluster and it needs to know how many parts should it divide the
 data into. Thats where the default parallelism is used.

 TD


 On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote:

 Hi TD:

 The input file is on hdfs.

  The file is approx 2.7 GB and when the process starts, there are 11
 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce
 by key.  After the file has been processed, I see new stages with 2 tasks
 that continue to be generated. I understand this value (2) is the default
 value for spark.default.parallelism but don't quite understand how is the
 value determined for generating tasks for reduceByKey, how is it used
 besides reduceByKey and what should be the optimal value for this.

  Thanks.


   On Thursday, July 10, 2014 7:24 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:


 How are you supplying the text file?


 On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote:

 Hi Folks:

 I am working on an application which uses spark streaming (version 1.1.0
 snapshot on a standalone cluster) to process text file and save counters in
 cassandra based on fields in each row.  I am testing the application in two
 modes:

- Process each row and save the counter in cassandra.  In this
scenario after the text file has been consumed, there is no task/stages
seen in the spark UI.
- If instead I use reduce by key before saving to cassandra, the spark
UI shows continuous generation of tasks/stages even after processing the
file has been completed.

 I believe this is because the reduce by key requires merging of data from
 different partitions.  But I was wondering if anyone has any
 insights/pointers for understanding this difference in behavior and how to
 avoid generating tasks/stages when there is no data (new file) available.

 Thanks

 Mans










Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Tathagata Das
The same executor can be used for both receiving and processing,
irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
down to the number of cores / task slots that executor has. Each receiver
is like a long running task, so each of them occupy a slot. If there are
free slots in the executor then other tasks can be run on them.

So if you are finding that the other tasks are being run, check how many
cores/task slots the executor has and whether there are more task slots
than the number of input dstream / receivers you are launching.

@Praveen  your answers were pretty much spot on, thanks for chipping in!




On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi Praveen,

 Thank you for the answer. That's interesting because if I only bring up
 one executor for the Spark Streaming, it seems only the receiver is
 working, no other tasks are happening, by checking the log and UI. Maybe
 it's just because the receiving task eats all the resource?, not because
 one executor can only run one receiver?

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com
 wrote:

 Here are my answers. But am just getting started with Spark Streaming -
 so please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be used
 for map/reduce tasks. In yarn-cluster mode, the driver program is actually
 run as application master (lives in the first container thats launched) and
 this is not an executor - hence its not used for other operations.
 4) the driver runs in a separate container. I think the same executor can
 be used for receiver and the processing task also (this part am not very
 sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work
 machine (*n**ote that each input DStream creates a single receiver
 (running on a worker machine) that receives a single stream of data*).
 Does the work machine mean the executor or physical machine? If I have
 more receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other operations,
 such as map, reduce, or fully occupied by the receiver? Similarly, if I run
 in yarn-cluster mode, is the executor running driver program used by other
 operations too?

 4. So if I have a driver program (cluster mode) and streaming receiver,
 do I have to have at least 2 executors because the program and streaming
 receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to
 understand how the Spark Streaming distributes in order to assign
 reasonable recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108






Re: Number of executors change during job running

2014-07-11 Thread Tathagata Das
Can you show us the program that you are running. If you are setting number
of partitions in the XYZ-ByKey operation as 300, then there should be 300
tasks for that stage, distributed on the 50 executors are allocated to your
context. However the data distribution may be skewed in which case, you can
use a repartition operation to redistributed the data more evenly (both
DStream and RDD have repartition).

TD


On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I 
 checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there
 are sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill








Re: executor failed, cannot find compute-classpath.sh

2014-07-11 Thread Andrew Or
Hi CJ,

Looks like I overlook a few lines in the spark shell case. It appears that
spark shell
explicitly overwrites
https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L955
spark.home to whatever SPARK_HOME is set to. I have filed
a JIRA to track this issue: https://issues.apache.org/jira/browse/SPARK-2454.
Until
then, you can workaround this by ensuring we don't set SPARK_HOME anywhere.

We currently do this in bin/spark-submit and bin/spark-class, so go ahead
and
remove these lines:

https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-submit#L20
https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-class#L31

In addition, make sure spark-submit no longer uses the SPARK_HOME variable
here:

https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-submit#L44

Now, as you have done before, setting spark.home to your executor's spark
home
should do the job. I have verified that this solves the problem in my own
cluster.

To verify that your configs are in fact set, you can always run
bin/spark-submit (or
spark-shell, which calls spark-submit) with the --verbose flag.

Let me know if this fixes it. I will get to fixing the root problem soon.

Andrew



2014-07-10 18:43 GMT-07:00 cjwang c...@cjwang.us:

 Andrew,

 Thanks for replying.  I did the following and the result was still the
 same.

 1. Added spark.home /root/spark-1.0.0 to local conf/spark-defaults.conf,
 where /root was the place in the cluster where I put Spark.

 2. Ran bin/spark-shell --master
 spark://sjc1-eng-float01.carrieriq.com:7077.

 3. Sighed when I still saw the same error:

 14/07/10 18:26:53 INFO AppClient$ClientActor: Executor updated:
 app-20140711012651-0007/5 is now FAILED (class java.io.IOException: Cannot
 run program /Users/cwang/spark/bin/compute-classpath.sh (in directory
 .): error=2, No such file or directory)

 /Users/cwang/spark was my local SPARK_HOME, which is wrong.

 What did I do wrong?  How do I know if the config file is taken?

 I am novice to Spark so spare with me.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/executor-failed-cannot-find-compute-classpath-sh-tp859p9378.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Yan Fang
Hi Tathagata,

Thank you. Is task slot equivalent to the core number? Or actually one core
can run multiple tasks at the same time?

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 The same executor can be used for both receiving and processing,
 irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
 down to the number of cores / task slots that executor has. Each receiver
 is like a long running task, so each of them occupy a slot. If there are
 free slots in the executor then other tasks can be run on them.

 So if you are finding that the other tasks are being run, check how many
 cores/task slots the executor has and whether there are more task slots
 than the number of input dstream / receivers you are launching.

 @Praveen  your answers were pretty much spot on, thanks for chipping in!




 On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi Praveen,

 Thank you for the answer. That's interesting because if I only bring up
 one executor for the Spark Streaming, it seems only the receiver is
 working, no other tasks are happening, by checking the log and UI. Maybe
 it's just because the receiving task eats all the resource?, not because
 one executor can only run one receiver?

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com
 wrote:

 Here are my answers. But am just getting started with Spark Streaming -
 so please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be used
 for map/reduce tasks. In yarn-cluster mode, the driver program is actually
 run as application master (lives in the first container thats launched) and
 this is not an executor - hence its not used for other operations.
 4) the driver runs in a separate container. I think the same executor
 can be used for receiver and the processing task also (this part am not
 very sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com
 wrote:

 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work
 machine (*n**ote that each input DStream creates a single receiver
 (running on a worker machine) that receives a single stream of data*).
 Does the work machine mean the executor or physical machine? If I have
 more receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other operations,
 such as map, reduce, or fully occupied by the receiver? Similarly, if I run
 in yarn-cluster mode, is the executor running driver program used by other
 operations too?

 4. So if I have a driver program (cluster mode) and streaming receiver,
 do I have to have at least 2 executors because the program and streaming
 receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to
 understand how the Spark Streaming distributes in order to assign
 reasonable recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108







pyspark sc.parallelize running OOM with smallish data

2014-07-11 Thread Mohit Jaggi
spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in
the cluster and gave 48g to executors. also tried kyro serialization.

traceback (most recent call last):

  File /mohit/./m.py, line 58, in module

spark_data = sc.parallelize(spark_data_array)

  File /mohit/spark/python/pyspark/context.py, line 265, in parallelize

jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

  File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 537, in __call__

  File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line
300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

: java.lang.OutOfMemoryError: Java heap space

at
org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)

at java.lang.Thread.run(Thread.java:745)


Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

Below is my main function. I omit some filtering and data conversion
functions. These functions are just a one-to-one mapping, which may not
possible increase running time. The only reduce function I have here is
groupByKey. There are 4 topics in my Kafka brokers and two of the topics
have 240k lines each minute. And the other two topics have less than 30k
lines per minute. The batch size is one minute and I specified 300
executors in my spark-submit script. The default parallelism is 300.


val parition = 300
val zkQuorum = zk1,zk2,zk3
val group = my-group- + currentTime.toString
val topics = topic1,topic2,topic3,topic4
val numThreads = 4
val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
ssc = new StreamingContext(conf, Seconds(batch))
ssc.checkpoint(hadoopOutput + checkpoint)
val lines = lines1
lines.cache()
val jsonData = lines.map(JSON.parseFull(_))
val mapData = jsonData.filter(_.isDefined)

.map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
val validMapData = mapData.filter(isValidData(_))
val fields = validMapData.map(data = (data(id).toString,
timestampToUTCUnix(data(time).toString),

 timestampToUTCUnix(data(local_time).toString), data(id2).toString,
   data(id3).toString,
data(log_type).toString, data(sub_log_type).toString))
val timeDiff = 3600L
val filteredFields = fields.filter(field = abs(field._2 - field._3) =
timeDiff)

val watchTimeFields = filteredFields.map(fields = (fields._1,
fields._2, fields._4, fields._5, fields._7))
val watchTimeTuples = watchTimeFields.map(fields =
getWatchtimeTuple(fields))
val programDuids = watchTimeTuples.map(fields = (fields._3,
fields._1)).groupByKey(partition)
val programDuidNum = programDuids.map{case(key, value) = (key,
value.toSet.size)}
programDuidNum.saveAsTextFiles(hadoopOutput+result)

I have been working on this for several days. No findings why there are
always 2 executors for the groupBy stage. Thanks a lot!

Bill


On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I 
 checked
 the slow stage 

Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread ShreyanshB
Hi,

I am trying graphx on live journal data. I have a cluster of 17 computing
nodes, 1 master and 16 workers. I had few questions about this. 
* I built spark from spark-master (to avoid partitionBy error of spark 1.0). 
* I am using edgeFileList() to load data and I figured I need to provide
partitions I want. the exact syntax I am using is following
val graph = GraphLoader.edgeListFile(sc,
filepath,true,64).partitionBy(PartitionStrategy.RandomVertexCut)

-- Is it a correct way to load file to get best performance?
-- What should be the partition size? =computing node or =cores?
-- I see following error so many times in my logs, 
ERROR BlockManagerWorker: Exception handling buffer message
java.io.NotSerializableException:
org.apache.spark.graphx.impl.ShippableVertexPartition
Does it suggest that my graph wasn't partitioned properly? I suspect it
affects performance ?

Please suggest whether I'm following every step (correctly)

Thanks in advance,
-Shreyansh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread Soumya Simanta
If you are on 1.0.0 release you can also try converting your RDD to a
SchemaRDD and run a groupBy there. The SparkSQL optimizer may yield
better results. It's worth a try at least.


On Fri, Jul 11, 2014 at 5:24 PM, Soumya Simanta soumya.sima...@gmail.com
wrote:






 Solution 2 is to map the objects into a pair RDD where the
 key is the number of the day in the interval, then group by
 key, collect, and parallelize the resulting grouped data.
 However, I worry collecting large data sets is going to be
 a serious performance bottleneck.


 Why do you have to do a collect ?  You can do a groupBy and then write
 the grouped data to disk again



Re: How to separate a subset of an RDD by day?

2014-07-11 Thread bdamos
ssimanta wrote
 Solution 2 is to map the objects into a pair RDD where the
 key is the number of the day in the interval, then group by
 key, collect, and parallelize the resulting grouped data.
 However, I worry collecting large data sets is going to be
 a serious performance bottleneck.
 Why do you have to do a collect ?  You can do a groupBy and then write
 the grouped data to disk again

I want to process the resulting data sets as RDD's,
and groupBy only returns the data as Seq.
Thanks on the idea to write the grouped data back to disk.
I think my best option is to partition my data in directories by day
before running my Spark application, and then direct
my Spark application to load RDD's from each directory when
I want to load a date range. How does this sound?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-separate-a-subset-of-an-RDD-by-day-tp9454p9459.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
On Fri, Jul 11, 2014 at 2:23 PM, ShreyanshB shreyanshpbh...@gmail.com
 wrote:

 -- Is it a correct way to load file to get best performance?


Yes, edgeListFile should be efficient at loading the edges.

-- What should be the partition size? =computing node or =cores?


In general it should be a multiple of the number of cores to exploit all
available parallelism, but because of shuffle overhead, it might help to
use fewer partitions -- in some cases even fewer than the number of cores.
You can measure the performance with different numbers of partitions to see
what is best.

-- I see following error so many times in my logs [...]
 NotSerializableException


This is a known bug, and there are two possible resolutions:

1. Switch from Java serialization to Kryo serialization, which is faster
and will also resolve the problem, by setting the following Spark
properties in conf/spark-defaults.conf:
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator org.apache.spark.graphx.GraphKryoRegistrator

2. Mark the affected classes as Serializable. I'll submit a patch with this
fix as well, but for now I'd suggest trying Kryo if possible.

Ankur http://www.ankurdave.com/


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread ShreyanshB
Thanks a lot Ankur, I'll follow that.

A last quick
Does that error affect performance?

~Shreyansh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread Soumya Simanta
 I think my best option is to partition my data in directories by day
 before running my Spark application, and then direct
 my Spark application to load RDD's from each directory when
 I want to load a date range. How does this sound?

 If your upstream system can write data by day then it makes perfect sense
to do that and load (into Spark) only the data that is required for
processing. This also saves you the filter step and hopefully time and
memory. If you want to get back the bigger dataset you can always join
multiple days of data (RDDs) together.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread bdamos
Sean Owen-2 wrote
 Can you not just filter the range you want, then groupBy
 timestamp/86400 ? That sounds like your solution 1 and is about as
 fast as it gets, I think. Are you thinking you would have to filter
 out each day individually from there, and that's why it would be slow?
 I don't think that's needed. You also don't need to map to pairs.

I didn't make it clear in my first message that I want to obtain an RDD
instead
of an Iterable, and will be doing map-reduce like operations on the
data by day. My problem is that groupBy returns an RDD[(K, Iterable[T])],
but I really want an RDD[(K, RDD[T])].
Is there a better approach to this?

I'm leaning towards partitioning my data by day on disk since all of my
queries will always process data per day.
However, the only problem I see with partitioning the data on disk is that
it
limits my system to cleanly work for a single timezone.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-separate-a-subset-of-an-RDD-by-day-tp9454p9464.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
Hi,

I tried out the streaming program on the Spark training web page. I created
a Twitter app as per the instructions (pointing to http://www.twitter.com).
When I run the program, my credentials get printed out correctly but
thereafter, my program just keeps waiting. It does not print out the hashtag
count etc.  My code appears below (essentially same as what is on the
training web page). I would like to know why I am not able to get a
continuous stream and the hashtag count.

thanks

   // relevant code snippet 

   
TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret)

 val ssc = new StreamingContext(new SparkConf(), Seconds(1))
 val tweets = TwitterUtils.createStream(ssc, None)
 val statuses = tweets.map(status = status.getText())
 statuses.print()

 ssc.checkpoint(checkpointDir)

 val words = statuses.flatMap(status = status.split( ))
 val hashtags = words.filter(word = word.startsWith(#))
 hashtags.print()

 val counts = hashtags.map(tag = (tag, 1))
  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 *
5), Seconds(1))
 counts.print()

 val sortedCounts = counts.map { case(tag, count) = (count, tag) }
 .transform(rdd = rdd.sortByKey(false))
 sortedCounts.foreach(rdd =
 println(\nTop 10 hashtags:\n +
rdd.take(10).mkString(\n)))

 ssc.start()
 ssc.awaitTermination()

//end code snippet 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread Sean Owen
On Fri, Jul 11, 2014 at 10:53 PM, bdamos a...@adobe.com wrote:
 I didn't make it clear in my first message that I want to obtain an RDD
 instead
 of an Iterable, and will be doing map-reduce like operations on the
 data by day. My problem is that groupBy returns an RDD[(K, Iterable[T])],
 but I really want an RDD[(K, RDD[T])].
 Is there a better approach to this?

Yeah, you can't have an RDD of RDDs. Why does it need to be an RDD --
because a day could have a huge amount of data? because Scala
collections have map and reduce methods and the like too.

I think that if you really want RDDs you can just make a series of
them, with some code like

(start/86400 to end/86400).map(day = (day, rdd.filter(rec = rec.time
= day*86400  rec.time  (day+1)*86400)))

I think that's your solution 1. I don't imagine it's that bad if this
is what you need to do.


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
I don't think it should affect performance very much, because GraphX
doesn't serialize ShippableVertexPartition in the fast path of
mapReduceTriplets execution (instead it calls
ShippableVertexPartition.shipVertexAttributes and serializes the result). I
think it should only get serialized for speculative execution, if you have
that enabled.

By the way, here's the fix: https://github.com/apache/spark/pull/1376

Ankur http://www.ankurdave.com/


Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Li Pu
I like the idea of using scala to drive the workflow. Spark already comes
with a scheduler, why not program a plugin to schedule other types of tasks
(copy file, send email, etc.)? Scala could handle any logic required by the
pipeline. Passing objects (including RDDs) between tasks is also easier. I
don't know if this is an overuse of Spark scheduler, but sounds like a good
tool. The only issue would be releasing resources that is not used at
intermediate steps.

On Fri, Jul 11, 2014 at 12:05 PM, Wei Tan w...@us.ibm.com wrote:

 Just curious: how about using scala to drive the workflow? I guess if you
 use other tools (oozie, etc) you lose the advantage of reading from RDD --
 you have to read from HDFS.

 Best regards,
 Wei

 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 *http://researcher.ibm.com/person/us-wtan*
 http://researcher.ibm.com/person/us-wtan



 From:k.tham kevins...@gmail.com
 To:u...@spark.incubator.apache.org,
 Date:07/10/2014 01:20 PM
 Subject:Recommended pipeline automation tool? Oozie?
 --



 I'm just wondering what's the general recommendation for data pipeline
 automation.

 Say, I want to run Spark Job A, then B, then invoke script C, then do D,
 and
 if D fails, do E, and if Job A fails, send email F, etc...

 It looks like Oozie might be the best choice. But I'd like some
 advice/suggestions.

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





-- 
Li
@vrilleup


Re: confirm subscribe to user@spark.apache.org

2014-07-11 Thread Veeranagouda Mukkanagoudar
On Fri, Jul 11, 2014 at 3:11 PM, user-h...@spark.apache.org wrote:

 Hi! This is the ezmlm program. I'm managing the
 user@spark.apache.org mailing list.

 To confirm that you would like

veera...@gmail.com

 added to the user mailing list, please send
 a short reply to this address:

user-sc.1405116686.kijenhjamnjaodhflpgc-veeran54=
 gmail@spark.apache.org

 Usually, this happens when you just hit the reply button.
 If this does not work, simply copy the address and paste it into
 the To: field of a new message.

 This confirmation serves two purposes. First, it verifies that I am able
 to get mail through to you. Second, it protects you in case someone
 forges a subscription request in your name.

 Please note that ALL Apache dev- and user- mailing lists are publicly
 archived.  Do familiarize yourself with Apache's public archive policy at

 http://www.apache.org/foundation/public-archives.html

 prior to subscribing and posting messages to user@spark.apache.org.
 If you're not sure whether or not the policy applies to this mailing list,
 assume it does unless the list name contains the word private in it.

 Some mail programs are broken and cannot handle long addresses. If you
 cannot reply to this request, instead send a message to
 user-requ...@spark.apache.org and put the
 entire address listed above into the Subject: line.


 --- Administrative commands for the user list ---

 I can handle administrative requests automatically. Please
 do not send them to the list address! Instead, send
 your message to the correct command address:

 To subscribe to the list, send a message to:
user-subscr...@spark.apache.org

 To remove your address from the list, send a message to:
user-unsubscr...@spark.apache.org

 Send mail to the following for info and FAQ for this list:
user-i...@spark.apache.org
user-...@spark.apache.org

 Similar addresses exist for the digest list:
user-digest-subscr...@spark.apache.org
user-digest-unsubscr...@spark.apache.org

 To get messages 123 through 145 (a maximum of 100 per request), mail:
user-get.123_...@spark.apache.org

 To get an index with subject and author for messages 123-456 , mail:
user-index.123_...@spark.apache.org

 They are always returned as sets of 100, max 2000 per request,
 so you'll actually get 100-499.

 To receive all messages with the same subject as message 12345,
 send a short message to:
user-thread.12...@spark.apache.org

 The messages should contain one line or word of text to avoid being
 treated as sp@m, but I will ignore their content.
 Only the ADDRESS you send to is important.

 You can start a subscription for an alternate address,
 for example john@host.domain, just add a hyphen and your
 address (with '=' instead of '@') after the command word:
 user-subscribe-john=host.dom...@spark.apache.org

 To stop subscription for this address, mail:
 user-unsubscribe-john=host.dom...@spark.apache.org

 In both cases, I'll send a confirmation message to that address. When
 you receive it, simply reply to it to complete your subscription.

 If despite following these instructions, you do not get the
 desired results, please contact my owner at
 user-ow...@spark.apache.org. Please be patient, my owner is a
 lot slower than I am ;-)

 --- Enclosed is a copy of the request I received.

 Return-Path: veera...@gmail.com
 Received: (qmail 70277 invoked by uid 99); 11 Jul 2014 22:11:26 -
 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136)
 by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:26
 +
 X-ASF-Spam-Status: No, hits=-0.3 required=10.0

 tests=ASF_LIST_OPS,FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS
 X-Spam-Check-By: apache.org
 Received-SPF: pass (athena.apache.org: domain of veera...@gmail.com
 designates 209.85.212.173 as permitted sender)
 Received: from [209.85.212.173] (HELO mail-wi0-f173.google.com)
 (209.85.212.173)
 by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:20
 +
 Received: by mail-wi0-f173.google.com with SMTP id cc10so349270wib.6
 for user-subscr...@spark.apache.org; Fri, 11 Jul 2014 15:10:58
 -0700 (PDT)
 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
 d=gmail.com; s=20120113;
 h=mime-version:date:message-id:subject:from:to:content-type;
 bh=kidlw3R7uWQaPspPvOk8WJFI36NQFLw02hzB1Mp9UVc=;

 b=WiLScFUuJZYgoF7St7OB4FdLcnRq4xvu1zO90rcJ3RlcLI2cT77fVe/KhCXDeanjwe

  9570nq83zivE2a/suKw/6j90hM/eGWas1Dw+N63myi69AN6V9q2FZICazw/WcPfVAPGY

  Vl7/OjjjAdIEDJ9bBglJ857FpkpOZ3ES+ZhmQb3xnEmqCyDMMfWDPeX7q8ZyHhJCkTgY

  EQuc6tD2Qco9Q9tYlqxv0gnqZQLR5RqgOnt/HzDE2b9Hrz+QUfmI039x6g5AQ7BKMI9h

  GHn2TTXJ31eGH+Iin0TG/SBLs8OKCttD0OeS+1XFH5zAHSSFlc734BDb5LQnBkqGDpIE
  hU8g==
 MIME-Version: 1.0
 X-Received: by 10.194.87.97 with SMTP id w1mr2272592wjz.42.1405116657184;
 Fri,
  11 Jul 2014 15:10:57 -0700 (PDT)
 Received: by 10.194.204.228 with HTTP; Fri, 11 Jul 2014 15:10:57 

Re: Using HQL is terribly slow: Potential Performance Issue

2014-07-11 Thread Zongheng Yang
Hey Jerry,

When you ran these queries using different methods, did you see any
discrepancy in the returned results (i.e. the counts)?

On Thu, Jul 10, 2014 at 5:55 PM, Michael Armbrust
mich...@databricks.com wrote:
 Yeah, sorry.  I think you are seeing some weirdness with partitioned tables
 that I have also seen elsewhere. I've created a JIRA and assigned someone at
 databricks to investigate.

 https://issues.apache.org/jira/browse/SPARK-2443


 On Thu, Jul 10, 2014 at 5:33 PM, Jerry Lam chiling...@gmail.com wrote:

 Hi Michael,

 Yes the table is partitioned on 1 column. There are 11 columns in the
 table and they are all String type.

 I understand that SerDes contributes to some overheads but using pure
 Hive, we could run the query about 5 times faster than SparkSQL. Given that
 Hive also has the same SerDes overhead, then there must be something
 additional that SparkSQL adds to the overall overheads that Hive doesn't
 have.

 Best Regards,

 Jerry



 On Thu, Jul 10, 2014 at 7:11 PM, Michael Armbrust mich...@databricks.com
 wrote:

 On Thu, Jul 10, 2014 at 2:08 PM, Jerry Lam chiling...@gmail.com wrote:

 For the curious mind, the dataset is about 200-300GB and we are using 10
 machines for this benchmark. Given the env is equal between the two
 experiments, why pure spark is faster than SparkSQL?


 There is going to be some overhead to parsing data using the Hive SerDes
 instead of the native Spark code, however, the slow down you are seeing here
 is much larger than I would expect. Can you tell me more about the table?
 What does the schema look like?  Is it partitioned?

 By the way, I also try hql(select * from m).count. It is terribly slow
 too.


 FYI, this query is actually identical to the one where you write out
 COUNT(*).





Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread Tathagata Das
You dont get any exception from twitter.com, saying credential error or
something?

I have seen this happen when once one was behind vpn to his office, and
probably twitter was blocked in their office.
You could be having a similar issue.

TD


On Fri, Jul 11, 2014 at 2:57 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I tried out the streaming program on the Spark training web page. I created
 a Twitter app as per the instructions (pointing to http://www.twitter.com
 ).
 When I run the program, my credentials get printed out correctly but
 thereafter, my program just keeps waiting. It does not print out the
 hashtag
 count etc.  My code appears below (essentially same as what is on the
 training web page). I would like to know why I am not able to get a
 continuous stream and the hashtag count.

 thanks

// relevant code snippet



 TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret)

  val ssc = new StreamingContext(new SparkConf(), Seconds(1))
  val tweets = TwitterUtils.createStream(ssc, None)
  val statuses = tweets.map(status = status.getText())
  statuses.print()

  ssc.checkpoint(checkpointDir)

  val words = statuses.flatMap(status = status.split( ))
  val hashtags = words.filter(word = word.startsWith(#))
  hashtags.print()

  val counts = hashtags.map(tag = (tag, 1))
   .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 *
 5), Seconds(1))
  counts.print()

  val sortedCounts = counts.map { case(tag, count) = (count, tag) }
  .transform(rdd = rdd.sortByKey(false))
  sortedCounts.foreach(rdd =
  println(\nTop 10 hashtags:\n +
 rdd.take(10).mkString(\n)))

  ssc.start()
  ssc.awaitTermination()

 //end code snippet




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



try JDBC server

2014-07-11 Thread Nan Zhu
Hi, all 

I would like to give a try on JDBC server (which is supposed to be released in 
1.1)

where can I find the document about that?

Best, 

-- 
Nan Zhu



Re: Some question about SQL and streaming

2014-07-11 Thread Tathagata Das
Yes, even though we dont have immediate plans, I definitely would like to
see it happen some time in not-so-distant future.

TD


On Thu, Jul 10, 2014 at 7:55 PM, Shao, Saisai saisai.s...@intel.com wrote:

  No specific plans to do so, since there has some functional loss like
 time based windowing function which is important for streaming sql. Also
 keep compatible with fast growing SparkSQL is quite hard. So no clear plans
 to submit to upstream.



 -Jerry



 *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
 *Sent:* Friday, July 11, 2014 10:47 AM

 *To:* user@spark.apache.org
 *Subject:* Re: Some question about SQL and streaming



 Hi,



 On Fri, Jul 11, 2014 at 11:38 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Actually we have a POC project which shows the power of combining Spark
 Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and
 get SchemaDStream. You can take a look at it:
 https://github.com/thunderain-project/StreamSQL



 Wow, that looks great! Any plans to get this code (or functionality)
 merged into Spark?



 Tobias





Re: Generic Interface between RDD and DStream

2014-07-11 Thread Tathagata Das
I totally agree that doing if we are able to do this it will be very cool.
However, this requires having a common trait / interface between RDD and
DStream, which we dont have as of now. It would be very cool though. On my
wish list for sure.

TD


On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote:

 I wanted to get a perspective on how to share code between Spark batch
 processing and Spark Streaming.

 For example, I want to get unique tweets stored in a HDFS file then in both
 Spark Batch and Spark Streaming. Currently I will have to do following
 thing:

 Tweet {
 String tweetText;
 String userId;
 }

 Spark Batch:
 tweets = sparkContext.newHadoopApiAsFile(tweet);

 def getUniqueTweets(tweets: RDD[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 Spark Streaming:

 tweets = streamingContext.fileStream(tweet);

 def getUniqueTweets(tweets: DStream[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }


 Above example shows I am doing the same thing but I have to replicate the
 code as there is no common abstraction between DStream and RDD,
 SparkContext
 and Streaming Context.

 If there was a common abstraction it would have been much simlper:

 tweets = context.read(tweet, Stream or Batch)

 def getUniqueTweets(tweets: SparkObject[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 I would appreciate thoughts on it. Is it already available? Is there any
 plan to add this support? Is it intentionally not supported because of
 design choice?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: try JDBC server

2014-07-11 Thread Nan Zhu
nvm 

for others with the same question:

https://github.com/apache/spark/commit/8032fe2fae3ac40a02c6018c52e76584a14b3438 

-- 
Nan Zhu


On Friday, July 11, 2014 at 7:02 PM, Nan Zhu wrote:

 Hi, all 
 
 I would like to give a try on JDBC server (which is supposed to be released 
 in 1.1)
 
 where can I find the document about that?
 
 Best, 
 
 -- 
 Nan Zhu
 
 
 




Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
I dont get any exceptions or error messages.

I tried it both with and without VPN and had the same outcome. But  I can
try again without VPN later today and report back.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi folks,

I just ran another job that only received data from Kafka, did some
filtering, and then save as text files in HDFS. There was no reducing work
involved. Surprisingly, the number of executors for the saveAsTextFiles
stage was also 2 although I specified 300 executors in the job submission.
As a result, the simple save file action took more than 2 minutes. Do you
have any idea how Spark determined the number of executors
for different stages?

Thanks!

Bill


On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 - field._3)
 = timeDiff)

 val watchTimeFields = filteredFields.map(fields = (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields = (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) = (key,
 value.toSet.size)}
 programDuidNum.saveAsTextFiles(hadoopOutput+result)

 I have been working on this for several days. No findings why there are
 always 2 executors for the groupBy stage. Thanks a lot!

 Bill


 On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I 
 further
 observed that most executors take less than 20 seconds but two of them 
 take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the
 DStream.ByKey operations? If the reduce by key is not set, 

Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread Soumya Simanta
Do you have a proxy server ? 

If yes you need to set the proxy for twitter4j 

 On Jul 11, 2014, at 7:06 PM, SK skrishna...@gmail.com wrote:
 
 I dont get any exceptions or error messages.
 
 I tried it both with and without VPN and had the same outcome. But  I can
 try again without VPN later today and report back.
 
 thanks.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Generic Interface between RDD and DStream

2014-07-11 Thread andy petrella
A while ago, I wrote this:
```

package com.virdata.core.compute.common.api

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext

sealed trait SparkEnvironment extends Serializable {
  type Context

  type Wagon[A]
}
object Batch extends SparkEnvironment {
  type Context = SparkContext
  type Wagon[A] = RDD[A]
}
object Streaming extends SparkEnvironment{
  type Context = StreamingContext
  type Wagon[A] = DStream[A]
}

```
Then I can produce code like this (just an example)

```

package com.virdata.core.compute.common.api

import org.apache.spark.Logging

trait Process[M[_], In, N[_], Out, E : SparkEnvironment] extends
Logging { self =

  def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]]

  def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] =
new Process[M,In,Q,U,E] {
override def run(in: M[E#Wagon[In]])(implicit context: E#Context):
Q[E#Wagon[U]] = {
  val run1: N[E#Wagon[Out]] = self.run(in)
  follow.run(run1)
}
  }
}

```

It's not resolving the whole thing, because we'll still have to duplicate
both code (for Batch and Streaming).
However, when the common traits will be there I'll have to remove half of
the implementations only -- without touching the calling side (using them),
and thus keeping my plain old backward compat' ^^.

I know it's just an intermediate hack, but still ;-)

greetz,


  aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]

http://about.me/noootsab


On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 I totally agree that doing if we are able to do this it will be very cool.
 However, this requires having a common trait / interface between RDD and
 DStream, which we dont have as of now. It would be very cool though. On my
 wish list for sure.

 TD


 On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote:

 I wanted to get a perspective on how to share code between Spark batch
 processing and Spark Streaming.

 For example, I want to get unique tweets stored in a HDFS file then in
 both
 Spark Batch and Spark Streaming. Currently I will have to do following
 thing:

 Tweet {
 String tweetText;
 String userId;
 }

 Spark Batch:
 tweets = sparkContext.newHadoopApiAsFile(tweet);

 def getUniqueTweets(tweets: RDD[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 Spark Streaming:

 tweets = streamingContext.fileStream(tweet);

 def getUniqueTweets(tweets: DStream[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }


 Above example shows I am doing the same thing but I have to replicate the
 code as there is no common abstraction between DStream and RDD,
 SparkContext
 and Streaming Context.

 If there was a common abstraction it would have been much simlper:

 tweets = context.read(tweet, Stream or Batch)

 def getUniqueTweets(tweets: SparkObject[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 I would appreciate thoughts on it. Is it already available? Is there any
 plan to add this support? Is it intentionally not supported because of
 design choice?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
I dont have a proxy server.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Linkage error - duplicate class definition

2014-07-11 Thread _soumya_
Facing a funny issue with the Spark class loader. Testing out a basic
functionality on a vagrant VM with spark running - looks like it's
attempting to ship the jar to a remote instance (in this case local) and
somehow is encountering the jar twice? 

14/07/11 23:27:59 INFO DAGScheduler: Got job 0 (count at
GenerateSEOContent.java:75) with 1 output partitions (allowLocal=false)
14/07/11 23:27:59 INFO DAGScheduler: Final stage: Stage 0(count at
GenerateSEOContent.java:75)
14/07/11 23:27:59 INFO DAGScheduler: Parents of final stage: List()
14/07/11 23:27:59 INFO DAGScheduler: Missing parents: List()
14/07/11 23:27:59 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map
at GenerateSEOContent.java:67), which has no missing parents
14/07/11 23:27:59 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0
(MappedRDD[1] at map at GenerateSEOContent.java:67)
14/07/11 23:27:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/07/11 23:27:59 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor localhost: localhost (PROCESS_LOCAL)
14/07/11 23:27:59 INFO TaskSetManager: Serialized task 0.0:0 as 3287 bytes
in 4 ms
14/07/11 23:27:59 INFO Executor: Running task ID 0
14/07/11 23:27:59 INFO Executor: Fetching
http://10.141.141.10:36365/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar with
timestamp 1405121278732
14/07/11 23:27:59 INFO Utils: Fetching
http://10.141.141.10:36365/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar to
/tmp/fetchFileTemp2298196547032055523.tmp
14/07/11 23:27:59 INFO Executor: Adding
file:/tmp/spark-defa5d35-1853-492f-b8e0-e7ac30a370b1/rickshaw-spark-0.0.1-SNAPSHOT.jar
to class loader
14/07/11 23:27:59 ERROR Executor: Exception in task ID 0
java.lang.LinkageError: loader (instance of 
org/apache/spark/executor/ChildExecutorURLClassLoader$userClassLoader$):
attempted  duplicate class definition for name:
com/evocalize/rickshaw/spark/util/HdfsUtil
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at
org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:42)
at
org.apache.spark.executor.ChildExecutorURLClassLoader.findClass(ExecutorURLClassLoader.scala:50)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Linkage-error-duplicate-class-definition-tp9482.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread ShreyanshB
Great! Thanks a lot. 
Hate to say this but I promise this is last quickie

I looked at the configurations but I didn't find any parameter to tune for
network bandwidth i.e. Is there anyway to tell graphx (spark) that I'm using
1G network or 10G network or infinite band? Does it figure out on its own
and speed up message passing accordingly?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9483.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


spark ui on yarn

2014-07-11 Thread Koert Kuipers
I just tested a long lived application (that we normally run in standalone
mode) on yarn in client mode.

it looks to me like cached rdds are missing in the storage tap of the ui.

accessing the rdd storage information via the spark context shows rdds as
fully cached but they are missing on storage page.

spark 1.0.0


ML classifier and data format for dataset with variable number of features

2014-07-11 Thread SK
Hi,

I need to perform binary classification on an image dataset. Each image is a
data point described by a Json object. The feature set for each image is a
set of feature vectors, each feature vector corresponding to a distinct
object in the image. For example, if an image has 5 objects, its feature set
will have 5 feature vectors, whereas an image that has 3 objects will have a
feature set consisting of 3 feature vectors. So  the number of feature
vectors  may be different for different images, although  each feature
vector has the same number of attributes. The classification depends on the
features of the individual objects, so I cannot aggregate them all into a
flat vector.

I have looked through the Mllib examples and it appears that the libSVM data
format and the LabeledData format that Mllib uses, require  all the points
to have the same number of features and they read in a flat feature vector.
I would like to know if any of the Mllib supervised learning classifiers can
be used with json data format and whether they can be used to classify
points with different number of features as described above.

thanks
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
Spark just uses opens up inter-slave TCP connections for message passing
during shuffles (I think the relevant code is in ConnectionManager). Since
TCP automatically determines
http://en.wikipedia.org/wiki/TCP_congestion-avoidance_algorithm the
optimal sending rate, Spark doesn't need any configuration parameters for
this.

Ankur http://www.ankurdave.com/


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread ShreyanshB
Perfect! Thanks Ankur.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ML classifier and data format for dataset with variable number of features

2014-07-11 Thread Xiangrui Meng
You can load the dataset as an RDD of JSON object and use a flatMap to
extract feature vectors at object level. Then you can filter the
training examples you want for binary classification. If you want to
try multiclass, checkout DB's PR at
https://github.com/apache/spark/pull/1379

Best,
Xiangrui

On Fri, Jul 11, 2014 at 5:12 PM, SK skrishna...@gmail.com wrote:
 Hi,

 I need to perform binary classification on an image dataset. Each image is a
 data point described by a Json object. The feature set for each image is a
 set of feature vectors, each feature vector corresponding to a distinct
 object in the image. For example, if an image has 5 objects, its feature set
 will have 5 feature vectors, whereas an image that has 3 objects will have a
 feature set consisting of 3 feature vectors. So  the number of feature
 vectors  may be different for different images, although  each feature
 vector has the same number of attributes. The classification depends on the
 features of the individual objects, so I cannot aggregate them all into a
 flat vector.

 I have looked through the Mllib examples and it appears that the libSVM data
 format and the LabeledData format that Mllib uses, require  all the points
 to have the same number of features and they read in a flat feature vector.
 I would like to know if any of the Mllib supervised learning classifiers can
 be used with json data format and whether they can be used to classify
 points with different number of features as described above.

 thanks




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Announcing Spark 1.0.1

2014-07-11 Thread Patrick Wendell
I am happy to announce the availability of Spark 1.0.1! This release
includes contributions from 70 developers. Spark 1.0.0 includes fixes
across several areas of Spark, including the core API, PySpark, and
MLlib. It also includes new features in Spark's (alpha) SQL library,
including support for JSON data and performance and stability fixes.

Visit the release notes[1] to read about this release or download[2]
the release today.

[1] http://spark.apache.org/releases/spark-release-1-0-1.html
[2] http://spark.apache.org/downloads.html


Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread Soumya Simanta
Try running a simple standalone program if you are using Scala and see if
you are getting any data. I use this to debug any connection/twitter4j
issues.


import twitter4j._


//put your keys and creds here
object Util {
  val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey()
.setOAuthConsumerSecret()
.setOAuthAccessToken()
.setOAuthAccessTokenSecret()
.build
}


/**
 *   Add this to your build.sbt
 *   org.twitter4j % twitter4j-stream % 3.0.3,

 */
object SimpleStreamer extends App {


  def simpleStatusListener = new StatusListener() {
def onStatus(status: Status) {
  println(status.getUserMentionEntities.length)
}

def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}

def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {}

def onException(ex: Exception) {
  ex.printStackTrace
}

def onScrubGeo(arg0: Long, arg1: Long) {}

def onStallWarning(warning: StallWarning) {}
  }

  val keywords = List(dog, cat)
  val twitterStream = new TwitterStreamFactory(Util.config).getInstance
  twitterStream.addListener(simpleStatusListener)
  twitterStream.filter(new FilterQuery().track(keywords.toArray))

}



On Fri, Jul 11, 2014 at 7:19 PM, SK skrishna...@gmail.com wrote:

 I dont have a proxy server.

 thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread Tathagata Das
Does nothing get printed on the screen? If you are not getting any tweets
but spark streaming is running successfully you should get at least counts
being printed every batch (which would be zero). But they are not being
printed either, check the spark web ui to see stages are running or not. If
they are not, you may not have enough cores to run.

TD


On Fri, Jul 11, 2014 at 7:09 PM, Soumya Simanta soumya.sima...@gmail.com
wrote:

 Try running a simple standalone program if you are using Scala and see if
 you are getting any data. I use this to debug any connection/twitter4j
 issues.


 import twitter4j._


 //put your keys and creds here
 object Util {
   val config = new twitter4j.conf.ConfigurationBuilder()
 .setOAuthConsumerKey()
 .setOAuthConsumerSecret()
 .setOAuthAccessToken()
 .setOAuthAccessTokenSecret()
 .build
 }


 /**
  *   Add this to your build.sbt
  *   org.twitter4j % twitter4j-stream % 3.0.3,

  */
 object SimpleStreamer extends App {


   def simpleStatusListener = new StatusListener() {
 def onStatus(status: Status) {
   println(status.getUserMentionEntities.length)
 }

 def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}

 def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {}

 def onException(ex: Exception) {
   ex.printStackTrace
 }

 def onScrubGeo(arg0: Long, arg1: Long) {}

 def onStallWarning(warning: StallWarning) {}
   }

   val keywords = List(dog, cat)
   val twitterStream = new TwitterStreamFactory(Util.config).getInstance
   twitterStream.addListener(simpleStatusListener)
   twitterStream.filter(new FilterQuery().track(keywords.toArray))

 }



 On Fri, Jul 11, 2014 at 7:19 PM, SK skrishna...@gmail.com wrote:

 I dont have a proxy server.

 thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Tathagata Das
Task slot is equivalent to core number. So one core can only run one task
at a time.

TD


On Fri, Jul 11, 2014 at 1:57 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Tathagata,

 Thank you. Is task slot equivalent to the core number? Or actually one
 core can run multiple tasks at the same time?

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 The same executor can be used for both receiving and processing,
 irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
 down to the number of cores / task slots that executor has. Each receiver
 is like a long running task, so each of them occupy a slot. If there are
 free slots in the executor then other tasks can be run on them.

 So if you are finding that the other tasks are being run, check how many
 cores/task slots the executor has and whether there are more task slots
 than the number of input dstream / receivers you are launching.

 @Praveen  your answers were pretty much spot on, thanks for chipping in!




 On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi Praveen,

 Thank you for the answer. That's interesting because if I only bring up
 one executor for the Spark Streaming, it seems only the receiver is
 working, no other tasks are happening, by checking the log and UI. Maybe
 it's just because the receiving task eats all the resource?, not because
 one executor can only run one receiver?

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com
 wrote:

 Here are my answers. But am just getting started with Spark Streaming -
 so please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be
 used for map/reduce tasks. In yarn-cluster mode, the driver program is
 actually run as application master (lives in the first container thats
 launched) and this is not an executor - hence its not used for other
 operations.
 4) the driver runs in a separate container. I think the same executor
 can be used for receiver and the processing task also (this part am not
 very sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com
 wrote:

 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are 
 used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work
 machine (*n**ote that each input DStream creates a single receiver
 (running on a worker machine) that receives a single stream of data*).
 Does the work machine mean the executor or physical machine? If I have
 more receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other operations,
 such as map, reduce, or fully occupied by the receiver? Similarly, if I 
 run
 in yarn-cluster mode, is the executor running driver program used by other
 operations too?

 4. So if I have a driver program (cluster mode) and streaming
 receiver, do I have to have at least 2 executors because the program and
 streaming receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to
 understand how the Spark Streaming distributes in order to assign
 reasonable recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108








Re: Number of executors change during job running

2014-07-11 Thread Tathagata Das
Aah, I get it now. That is because the input data streams is replicated on
two machines, so by locality the data is processed on those two machines.
So the map stage on the data uses 2 executors, but the reduce stage,
(after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
parallelism takes into affect only when the data is explicitly shuffled
around.

You can fix this by explicitly repartitioning the data.

inputDStream.repartition(partitions)

This is covered in the streaming tuning guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
.

TD



On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 - field._3)
 = timeDiff)

 val watchTimeFields = filteredFields.map(fields = (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields = (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) = (key,
 value.toSet.size)}
 programDuidNum.saveAsTextFiles(hadoopOutput+result)

 I have been working on this for several days. No findings why there are
 always 2 executors for the groupBy stage. Thanks a lot!

 Bill


 On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD



Re: pyspark sc.parallelize running OOM with smallish data

2014-07-11 Thread Mohit Jaggi
I put the same dataset into scala (using spark-shell) and it acts weird. I
cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
in the status bar, shows details about the worker nodes but there is no
progress.
sc.parallelize does finish (takes too long for the data size) in scala.


On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com wrote:

 spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
 in the cluster and gave 48g to executors. also tried kyro serialization.

 traceback (most recent call last):

   File /mohit/./m.py, line 58, in module

 spark_data = sc.parallelize(spark_data_array)

   File /mohit/spark/python/pyspark/context.py, line 265, in parallelize

 jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__

   File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line
 300, in get_return_value

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

 : java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

 at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

 at py4j.Gateway.invoke(Gateway.java:259)

 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

 at py4j.commands.CallCommand.execute(CallCommand.java:79)

 at py4j.GatewayConnection.run(GatewayConnection.java:207)

 at java.lang.Thread.run(Thread.java:745)



Re: Spark Streaming timing considerations

2014-07-11 Thread Tathagata Das
This is not in the current streaming API.

Queue stream is useful for testing with generated RDDs, but not for actual
data. For actual data stream, the slack time can be implemented by doing
DStream.window on a larger window that take slack time in consideration,
and then the required application-time-based-window of data filtered out.
For example, if you want a slack time of 1 minute and batches of 10
seconds, then do a window operation of 70 seconds, then in each RDD filter
out the records with the desired application time and process them.

TD


On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 In the spark streaming paper, slack time has been suggested for delaying
 the batch creation in case of external timestamps. I don't see any such
 option in streamingcontext. Is it available in the API?

 Also going through the previous posts, queueStream has been suggested for
 this. I looked into to queueStream example.

  // Create and push some RDDs into Queue
 for (i - 1 to 30) {
 rddQueue += ssc.sparkContext.makeRDD(1 to 10)
 Thread.sleep(1000)
 }

 The only thing I am unsure is how to make batches(basic RDD) out of stream
 coming on a port.

 Regards,
 Laeeq




Re: Generic Interface between RDD and DStream

2014-07-11 Thread Tathagata Das
Hey Andy,

Thats pretty cool!! Is there a github repo where you can share this piece
of code for us to play around? If we can come up with a simple enough
general pattern, that can be very usefull!

TD


On Fri, Jul 11, 2014 at 4:12 PM, andy petrella andy.petre...@gmail.com
wrote:

 A while ago, I wrote this:
 ```


 package com.virdata.core.compute.common.api



 import org.apache.spark.rdd.RDD


 import org.apache.spark.SparkContext
 import org.apache.spark.streaming.dstream.DStream


 import org.apache.spark.streaming.StreamingContext



 sealed trait SparkEnvironment extends Serializable {


   type Context


   type Wagon[A]


 }
 object Batch extends SparkEnvironment {


   type Context = SparkContext


   type Wagon[A] = RDD[A]


 }
 object Streaming extends SparkEnvironment{


   type Context = StreamingContext


   type Wagon[A] = DStream[A]


 }

 ```
 Then I can produce code like this (just an example)

 ```



 package com.virdata.core.compute.common.api


 import org.apache.spark.Logging


 trait Process[M[_], In, N[_], Out, E : SparkEnvironment] extends Logging { 
 self =



   def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]]



   def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] = new 
 Process[M,In,Q,U,E] {


 override def run(in: M[E#Wagon[In]])(implicit context: E#Context): 
 Q[E#Wagon[U]] = {


   val run1: N[E#Wagon[Out]] = self.run(in)


   follow.run(run1)


 }
   }

 }

 ```

 It's not resolving the whole thing, because we'll still have to duplicate
 both code (for Batch and Streaming).
 However, when the common traits will be there I'll have to remove half of
 the implementations only -- without touching the calling side (using them),
 and thus keeping my plain old backward compat' ^^.

 I know it's just an intermediate hack, but still ;-)

 greetz,


   aℕdy ℙetrella
 about.me/noootsab
 [image: aℕdy ℙetrella on about.me]

 http://about.me/noootsab


 On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 I totally agree that doing if we are able to do this it will be very
 cool. However, this requires having a common trait / interface between RDD
 and DStream, which we dont have as of now. It would be very cool though. On
 my wish list for sure.

 TD


 On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote:

 I wanted to get a perspective on how to share code between Spark batch
 processing and Spark Streaming.

 For example, I want to get unique tweets stored in a HDFS file then in
 both
 Spark Batch and Spark Streaming. Currently I will have to do following
 thing:

 Tweet {
 String tweetText;
 String userId;
 }

 Spark Batch:
 tweets = sparkContext.newHadoopApiAsFile(tweet);

 def getUniqueTweets(tweets: RDD[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 Spark Streaming:

 tweets = streamingContext.fileStream(tweet);

 def getUniqueTweets(tweets: DStream[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }


 Above example shows I am doing the same thing but I have to replicate the
 code as there is no common abstraction between DStream and RDD,
 SparkContext
 and Streaming Context.

 If there was a common abstraction it would have been much simlper:

 tweets = context.read(tweet, Stream or Batch)

 def getUniqueTweets(tweets: SparkObject[Tweet])= {
  tweets.map(tweet=(tweetText,
 tweet).groupByKey(tweetText).map((tweetText, _) =tweetText)
 }

 I would appreciate thoughts on it. Is it already available? Is there any
 plan to add this support? Is it intentionally not supported because of
 design choice?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.






Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

Do you mean that the data is not shuffled until the reduce stage? That
means groupBy still only uses 2 machines?

I think I used repartition(300) after I read the data from Kafka into
DStream. It seems that it did not guarantee that the map or reduce stages
will be run on 300 machines. I am currently trying to initiate 100 DStream
from KafkaUtils.createDStream and union them. Now the reduce stages had
around 80 machines for all the batches. However, this method will introduce
many dstreams. It will be good if we can control the number of executors in
the groupBy operation because the calculation needs to be finished within 1
minute for different size of input data based on our production need.

Thanks!


Bill


On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Aah, I get it now. That is because the input data streams is replicated on
 two machines, so by locality the data is processed on those two machines.
 So the map stage on the data uses 2 executors, but the reduce stage,
 (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
 parallelism takes into affect only when the data is explicitly shuffled
 around.

 You can fix this by explicitly repartitioning the data.

 inputDStream.repartition(partitions)

 This is covered in the streaming tuning guide
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
 .

 TD



 On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi folks,

 I just ran another job that only received data from Kafka, did some
 filtering, and then save as text files in HDFS. There was no reducing work
 involved. Surprisingly, the number of executors for the saveAsTextFiles
 stage was also 2 although I specified 300 executors in the job submission.
 As a result, the simple save file action took more than 2 minutes. Do you
 have any idea how Spark determined the number of executors
 for different stages?

 Thanks!

 Bill


 On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Below is my main function. I omit some filtering and data conversion
 functions. These functions are just a one-to-one mapping, which may not
 possible increase running time. The only reduce function I have here is
 groupByKey. There are 4 topics in my Kafka brokers and two of the topics
 have 240k lines each minute. And the other two topics have less than 30k
 lines per minute. The batch size is one minute and I specified 300
 executors in my spark-submit script. The default parallelism is 300.


 val parition = 300
 val zkQuorum = zk1,zk2,zk3
 val group = my-group- + currentTime.toString
 val topics = topic1,topic2,topic3,topic4
 val numThreads = 4
 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
 ssc = new StreamingContext(conf, Seconds(batch))
 ssc.checkpoint(hadoopOutput + checkpoint)
 val lines = lines1
 lines.cache()
 val jsonData = lines.map(JSON.parseFull(_))
 val mapData = jsonData.filter(_.isDefined)

 .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
 val validMapData = mapData.filter(isValidData(_))
 val fields = validMapData.map(data = (data(id).toString,
 timestampToUTCUnix(data(time).toString),

  timestampToUTCUnix(data(local_time).toString), data(id2).toString,
data(id3).toString,
 data(log_type).toString, data(sub_log_type).toString))
 val timeDiff = 3600L
 val filteredFields = fields.filter(field = abs(field._2 - field._3)
 = timeDiff)

 val watchTimeFields = filteredFields.map(fields = (fields._1,
 fields._2, fields._4, fields._5, fields._7))
 val watchTimeTuples = watchTimeFields.map(fields =
 getWatchtimeTuple(fields))
 val programDuids = watchTimeTuples.map(fields = (fields._3,
 fields._1)).groupByKey(partition)
 val programDuidNum = programDuids.map{case(key, value) = (key,
 value.toSet.size)}
 programDuidNum.saveAsTextFiles(hadoopOutput+result)

 I have been working on this for several days. No findings why there are
 always 2 executors for the groupBy stage. Thanks a lot!

 Bill


 On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which 

Re: Announcing Spark 1.0.1

2014-07-11 Thread Henry Saputra
Congrats to the Spark community !

On Friday, July 11, 2014, Patrick Wendell pwend...@gmail.com wrote:

 I am happy to announce the availability of Spark 1.0.1! This release
 includes contributions from 70 developers. Spark 1.0.0 includes fixes
 across several areas of Spark, including the core API, PySpark, and
 MLlib. It also includes new features in Spark's (alpha) SQL library,
 including support for JSON data and performance and stability fixes.

 Visit the release notes[1] to read about this release or download[2]
 the release today.

 [1] http://spark.apache.org/releases/spark-release-1-0-1.html
 [2] http://spark.apache.org/downloads.html



RE: EC2 Cluster script. Shark install fails

2014-07-11 Thread Jason H
Thanks Michael
Missed that point as well as the integration of SQL within the scala shell 
(with setting the SQLContext)Looking forward to feature parity with feature 
releases. (Shark - Spark SQL) 
Cheers.

From: mich...@databricks.com
Date: Thu, 10 Jul 2014 16:20:20 -0700
Subject: Re: EC2 Cluster script. Shark install fails
To: user@spark.apache.org

There is no version of Shark that is compatible with Spark 1.0, however, Spark 
SQL does come included automatically.  More information here:
http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html


http://spark.apache.org/docs/latest/sql-programming-guide.html




On Thu, Jul 10, 2014 at 5:51 AM, Jason H jas...@developer.net.nz wrote:





Hi
Just going though the process of installing Spark 1.0.0 on EC2 and notice that 
the script throws an error when installing shark. 








Unpacking Spark

~/spark-ec2

Initializing shark

~ ~/spark-ec2

ERROR: Unknown Shark version


The install completes in the end but shark is completely missed. Looking for 
info on the best way to manually add this in now that the cluster is setup. Is 
there no Shark version compat with 1.0.0 or this script? 




Any suggestions appreciated.
  

  

Re: One question about RDD.zip function when trying Naive Bayes

2014-07-11 Thread x
I tried my test case with Spark 1.0.1 and saw the same result(27 pairs
becomes 25 pairs after zip).

Could someone please check it?

Regards,
xj

On Thu, Jul 3, 2014 at 2:31 PM, Xiangrui Meng men...@gmail.com wrote:

 This is due to a bug in sampling, which was fixed in 1.0.1 and latest
 master. See https://github.com/apache/spark/pull/1234 . -Xiangrui

 On Wed, Jul 2, 2014 at 8:23 PM, x wasedax...@gmail.com wrote:
  Hello,
 
  I a newbie to Spark MLlib and ran into a curious case when following the
  instruction at the page below.
 
  http://spark.apache.org/docs/latest/mllib-naive-bayes.html
 
  I ran a test program on my local machine using some data.
 
  val spConfig = (new
  SparkConf).setMaster(local).setAppName(SparkNaiveBayes)
  val sc = new SparkContext(spConfig)
 
  The test data was as follows and there were three lableled categories I
  wanted to predict.
 
   1  LabeledPoint(0.0, [4.9,3.0,1.4,0.2])
   2  LabeledPoint(0.0, [4.6,3.4,1.4,0.3])
   3  LabeledPoint(0.0, [5.7,4.4,1.5,0.4])
   4  LabeledPoint(0.0, [5.2,3.4,1.4,0.2])
   5  LabeledPoint(0.0, [4.7,3.2,1.6,0.2])
   6  LabeledPoint(0.0, [4.8,3.1,1.6,0.2])
   7  LabeledPoint(0.0, [5.1,3.8,1.9,0.4])
   8  LabeledPoint(0.0, [4.8,3.0,1.4,0.3])
   9  LabeledPoint(0.0, [5.0,3.3,1.4,0.2])
  10  LabeledPoint(1.0, [6.6,2.9,4.6,1.3])
  11  LabeledPoint(1.0, [5.2,2.7,3.9,1.4])
  12  LabeledPoint(1.0, [5.6,2.5,3.9,1.1])
  13  LabeledPoint(1.0, [6.4,2.9,4.3,1.3])
  14  LabeledPoint(1.0, [6.6,3.0,4.4,1.4])
  15  LabeledPoint(1.0, [6.0,2.7,5.1,1.6])
  16  LabeledPoint(1.0, [5.5,2.6,4.4,1.2])
  17  LabeledPoint(1.0, [5.8,2.6,4.0,1.2])
  18  LabeledPoint(1.0, [5.7,2.9,4.2,1.3])
  19  LabeledPoint(1.0, [5.7,2.8,4.1,1.3])
  20  LabeledPoint(2.0, [6.3,2.9,5.6,1.8])
  21  LabeledPoint(2.0, [6.5,3.0,5.8,2.2])
  22  LabeledPoint(2.0, [6.5,3.0,5.5,1.8])
  23  LabeledPoint(2.0, [6.7,3.3,5.7,2.1])
  24  LabeledPoint(2.0, [7.4,2.8,6.1,1.9])
  25  LabeledPoint(2.0, [6.3,3.4,5.6,2.4])
  26  LabeledPoint(2.0, [6.0,3.0,4.8,1.8])
  27  LabeledPoint(2.0, [6.8,3.2,5.9,2.3])
 
  The predicted result via NaiveBayes is below. Comparing to test data,
 only
  two predicted results(#11 and #15) were different.
 
   1  0.0
   2  0.0
   3  0.0
   4  0.0
   5  0.0
   6  0.0
   7  0.0
   8  0.0
   9  0.0
  10  1.0
  11  2.0
  12  1.0
  13  1.0
  14  1.0
  15  2.0
  16  1.0
  17  1.0
  18  1.0
  19  1.0
  20  2.0
  21  2.0
  22  2.0
  23  2.0
  24  2.0
  25  2.0
  26  2.0
  27  2.0
 
  After grouping test RDD and predicted RDD via zip I got this.
 
   1  (0.0,0.0)
   2  (0.0,0.0)
   3  (0.0,0.0)
   4  (0.0,0.0)
   5  (0.0,0.0)
   6  (0.0,0.0)
   7  (0.0,0.0)
   8  (0.0,0.0)
   9  (0.0,1.0)
  10  (0.0,1.0)
  11  (0.0,1.0)
  12  (1.0,1.0)
  13  (1.0,1.0)
  14  (2.0,1.0)
  15  (1.0,1.0)
  16  (1.0,2.0)
  17  (1.0,2.0)
  18  (1.0,2.0)
  19  (1.0,2.0)
  20  (2.0,2.0)
  21  (2.0,2.0)
  22  (2.0,2.0)
  23  (2.0,2.0)
  24  (2.0,2.0)
  25  (2.0,2.0)
 
  I expected there were 27 pairs but I saw two results were lost.
  Could someone please point out what I missed something here?
 
  Regards,
  xj