ClassNotFoundException with Spark/Mesos (spark-shell works fine)

2014-05-21 Thread Tobias Pfeiffer
Hi,

I have set up a cluster with Mesos (backed by Zookeeper) with three
master and three slave instances. I set up Spark (git HEAD) for use
with Mesos according to this manual:
http://people.apache.org/~pwendell/catalyst-docs/running-on-mesos.html

Using the spark-shell, I can connect to this cluster and do simple RDD
operations, but the same code in a Scala class and executed via sbt
run-main works only partially. (That is, count() works, count() after
flatMap() does not.)

Here is my code: https://gist.github.com/tgpfeiffer/7d20a4d59ee6e0088f91
The file SparkExamplesScript.scala, when pasted into spark-shell,
outputs the correct count() for the parallelized list comprehension,
as well as for the flatMapped RDD.

The file SparkExamplesMinimal.scala contains exactly the same code,
and also the MASTER configuration and the Spark Executor are the same.
However, while the count() for the parallelized list is displayed
correctly, I receive the following error when asking for the count()
of the flatMapped RDD:

-

14/05/21 09:47:49 INFO scheduler.DAGScheduler: Submitting Stage 1
(FlatMappedRDD[1] at flatMap at SparkExamplesMinimal.scala:34), which
has no missing parents
14/05/21 09:47:49 INFO scheduler.DAGScheduler: Submitting 8 missing
tasks from Stage 1 (FlatMappedRDD[1] at flatMap at
SparkExamplesMinimal.scala:34)
14/05/21 09:47:49 INFO scheduler.TaskSchedulerImpl: Adding task set
1.0 with 8 tasks
14/05/21 09:47:49 INFO scheduler.TaskSetManager: Starting task 1.0:0
as TID 8 on executor 20140520-102159-2154735808-5050-1108-1: mesos9-1
(PROCESS_LOCAL)
14/05/21 09:47:49 INFO scheduler.TaskSetManager: Serialized task 1.0:0
as 1779147 bytes in 37 ms
14/05/21 09:47:49 WARN scheduler.TaskSetManager: Lost TID 8 (task 1.0:0)
14/05/21 09:47:49 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: spark.SparkExamplesMinimal$$anonfun$2
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

-

Can anyone explain to me where this comes from or how I might further
track the problem down?

Thanks,
Tobias


Re: I want to filter a stream by a subclass.

2014-05-21 Thread Tobias Pfeiffer
On Thu, May 22, 2014 at 8:07 AM, Tathagata Das
tathagata.das1...@gmail.com wrote:
 records.filter { _.isInstanceOf[Orange] } .map { _.asInstanceOf[Orange] }

I think a Scala-ish way would be

records.flatMap(_ match {
  case i: Int=
Some(i)
  case _ =
None
})


Re: RDD union of a window in Dstream

2014-05-21 Thread Tobias Pfeiffer
Hi,

On Wed, May 21, 2014 at 9:42 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:
 I want to do union of all RDDs in each window of DStream.

A window *is* a union of all RDDs in the respective time interval.

The documentation says a DStream is represented as a sequence of
RDDs. However, data from a certain time interval will always be
contained in *one* RDD, not a sequence of RDDs (AFAIK).

Regards
Tobias


Re: Using Spark to analyze complex JSON

2014-05-21 Thread Tobias Pfeiffer
Hi,

as far as I understand, if you create an RDD with a relational
structure from your JSON, you should be able to do much of that
already today. For example, take lift-json's deserializer and do
something like

  val json_table: RDD[MyCaseClass] = json_data.flatMap(json =
json.extractOpt[MyCaseClass])

then I guess you can use Spark SQL on that. (Something like your
likes[2] query won't work, though, I guess.)

Regards
Tobias


On Thu, May 22, 2014 at 5:32 AM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 Looking forward to that update!

 Given a table of JSON objects like this one:

 {
name: Nick,
location: {
   x: 241.6,
   y: -22.5
},
likes: [ice cream, dogs, Vanilla Ice]
 }

 It would be SUPER COOL if we could query that table in a way that is as
 natural as follows:

 SELECT DISTINCT name
 FROM json_table;

 SELECT MAX(location.x)
 FROM json_table;

 SELECT likes[2] -- Ice Ice Baby
 FROM json_table
 WHERE name = Nick;

 Of course, this is just a hand-wavy suggestion of how I’d like to be able to
 query JSON (particularly that last example) using SQL. I’m interested in
 seeing what y’all come up with.

 A large part of what my team does is make it easy for analysts to explore
 and query JSON data using SQL. We have a fairly complex home-grown process
 to do that and are looking to replace it with something more out of the box.
 So if you’d like more input on how users might use this feature, I’d be glad
 to chime in.

 Nick



 On Wed, May 21, 2014 at 11:21 AM, Michael Armbrust mich...@databricks.com
 wrote:

 You can already extract fields from json data using Hive UDFs.  We have an
 intern working on on better native support this summer.  We will be sure to
 post updates once there is a working prototype.

 Michael


 On Tue, May 20, 2014 at 6:46 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 The Apache Drill home page has an interesting heading: Liberate Nested
 Data.

 Is there any current or planned functionality in Spark SQL or Shark to
 enable SQL-like querying of complex JSON?

 Nick


 
 View this message in context: Using Spark to analyze complex JSON
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Spark Streaming on Mesos, various questions

2014-05-22 Thread Tobias Pfeiffer
Hi,

with the hints from Gerard I was able to get my locally working Spark
code running on Mesos. Thanks!

Basically, on my local dev machine, I use sbt assembly to create a
fat jar (which is actually not so fat since I use ... % 'provided'
in my sbt file for the Spark dependencies), upload it to my cluster
and run it using
  java -cp myApplicationCode.jar:spark-assembly-1.0.0-SNAPSHOT.jar
mypackage.MainClass
I can see in my Mesos master web interface how the tasks are added and
distributed to the slaves and in the driver program I can see the
final results, that is very nice.

Now, as the next step, I wanted to get Spark Streaming running. That
worked out by now, but I have various questions. I'd be happy if
someone could help me out with some answers.

1. I wrongly assumed that when using ssc.socketTextStream(), the
driver would connect to the specified server. It does not; apparently
one of the slaves does ;-) Does that mean that before any DStream
processing can be done, all the received data needs to be sent to the
other slaves? What about the extreme case dstream.filter(x = false);
would all the data be transferred to other hosts, just to be discarded
there?

2. How can I reduce the logging? It seems like for every chunk
received from the socketTextStream, I get a line INFO
BlockManagerInfo: Added input-0-1400739888200 in memory on ...,
that's very noisy. Also, when the foreachRDD() is processed every N
seconds, I get a lot of output.

3. In my (non-production) cluster, I have six slaves, two of which
have 2G of RAM, the other four just 512M. So far, I have not seen
Mesos ever give a job to one of the four low-mem machines. Is 512M
just not enough for *any* task, or is there a rationale like they are
not cool enough to play with the Big Guys built into Mesos?

4. I don't have any HDFS or shared disk space. What does this mean for
Spark Streaming's default storage level MEMORY_AND_DISK_SER_2?

5. My prototype example for Spark Streaming is a simple word count:
  val wordCounts = ssc.socketTextStream(...).flatMap(_.split(
)).map((_, 1)).reduceByKey(_ + _)
  wordCounts.print()
However, (with a batchDuration of five seconds) this only works
correctly if I run the application in Mesos coarse mode. In the
default fine-grained mode, I will always receive 0 as word count
(that is, a wrong result), and a lot of warnings like
  W0522 06:57:23.578400 12824 sched.cpp:901] Attempting to launch task
7 with an unknown offer 20140520-102159-2154735808-5050-1108-7891
Can anyone explain this behavior?

Thanks,
Tobias


Re: A single build.sbt file to start Spark REPL?

2014-06-03 Thread Tobias Pfeiffer
Hi,

I guess it should be possible to dig through the scripts
bin/spark-shell, bin/spark-submit etc. and convert them to a long sbt
command that you can run. I just tried

  sbt run-main org.apache.spark.deploy.SparkSubmit spark-shell
--class org.apache.spark.repl.Main

but that fails with

Failed to initialize compiler: object scala.runtime in compiler mirror
not found.
** Note that as of 2.8 scala does not assume use of the java classpath.
** For the old behavior pass -usejavacp to scala, or if using a Settings
** object programatically, settings.usejavacp.value = true.

Would be happy to learn about a way to do that, too.

Tobias

On Tue, Jun 3, 2014 at 11:56 AM, Alexy Khrabrov al...@scalable.pro wrote:
 The usual way to use Spark with SBT is to package a Spark project using sbt 
 package (e.g. per Quick Start) and submit it to Spark using the bin/ scripts 
 from Sark distribution.  For plain Scala project, you don’t need to download 
 anything, you can just get a build.sbt file with dependencies and e.g. say 
 “console” which will start a Scala REPL with the dependencies on the class 
 path.  Is there a way to avoid downloading Spark tarball completely, by 
 defining the spark-core dependency in build.sbt, and using `run` or `console` 
 to invoke Spark REPL from sbt?  I.e. the goal is: create a single build.sbt 
 file, such that if you run sbt in its directory, and then say run/console 
 (with optional parameters), it will download all Spark dependencies and start 
 the REPL.  Should work on a fresh machine where Spark tarball had never been 
 untarred.

 A+


How to shut down Spark Streaming with Kafka properly?

2014-06-05 Thread Tobias Pfeiffer
Hi,

I am trying to use Spark Streaming with Kafka, which works like a
charm -- except for shutdown. When I run my program with sbt
run-main, sbt will never exit, because there are two non-daemon
threads left that don't die.

I created a minimal example at
https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future { ...
}` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1.
There are a number of threads remaining that will prevent sbt from
exiting.

When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2.

Does anyone have *any* idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.

Thanks
Tobias


Re: How to shut down Spark Streaming with Kafka properly?

2014-06-05 Thread Tobias Pfeiffer
Sean,

your patch fixes the issue, thank you so much! (This is the second
time within one week I run into network libraries not shutting down
threads properly, I'm really glad your code fixes the issue.)

I saw your pull request is closed, but not merged yet. Can I do
anything to get your fix into Spark? Open an issue, send a pull
request myself etc.?

Thanks
Tobias


Re: Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

2014-06-08 Thread Tobias Pfeiffer
Gaurav,

I am not sure that the * expands to what you expect it to do.
Normally the bash expands * to a space-separated string, not
colon-separated. Try specifying all the jars manually, maybe?

Tobias

On Thu, Jun 5, 2014 at 6:45 PM, Gaurav Dasgupta gaurav.d...@gmail.com wrote:
 Hi,

 I have written my own custom Spark streaming code which connects to Kafka
 server and fetch data. I have tested the code on local mode and it is
 working fine. But when I am executing the same code on YARN mode, I am
 getting KafkaReceiver class not found exception. I am providing the Spark
 Kafka jar in the classpath and ensured that the path is correct for all the
 nodes in my cluster.

 I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes (10
 node cluster) in the YARN cluster.
 I am using the following command to run my code on YARN mode:

 SPARK_YARN_MODE=true
 SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
 SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp
 /usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/
 SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic
 NewTestTable 1

 Below is the error message I am getting:

 14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set
 2.0 with 1 tasks
 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID
 70 on executor 2: manny6.musigma.com (PROCESS_LOCAL)
 14/06/05 04:29:12 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as
 2971 bytes in 2 ms
 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task 2.0:0)
 14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ClassNotFoundException
 java.lang.ClassNotFoundException:
 org.apache.spark.streaming.kafka.KafkaReceiver
 at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:247)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
 at
 java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
 at
 org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72)
 at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
 at
 org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145)
 at
 java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
 at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
 at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at
 

Re: Are scala.MatchError messages a problem?

2014-06-08 Thread Tobias Pfeiffer
Jeremy,

On Mon, Jun 9, 2014 at 10:22 AM, Jeremy Lee
unorthodox.engine...@gmail.com wrote:
 When you use match, the match must be exhaustive. That is, a match error
 is thrown if the match fails.

 Ahh, right. That makes sense. Scala is applying its strong typing rules
 here instead of no ceremony... but isn't the idea that type errors should
 get picked up at compile time? I suppose the compiler can't tell there's not
 complete coverage, but it seems strange to throw that at runtime when it is
 literally the 'default case'.

You can use subclasses of sealed traits to get a compiler warning
for non-exhaustive matches:
http://stackoverflow.com/questions/11203268/what-is-a-sealed-trait
I don't know if it can be applied for regular expression matching, though...

Tobias


Re: Classpath errors with Breeze

2014-06-08 Thread Tobias Pfeiffer
Hi,

I had a similar problem; I was using `sbt assembly` to build a jar
containing all my dependencies, but since my file system has a problem
with long file names (due to disk encryption), some class files (which
correspond to functions in Scala) where not included in the jar I
uploaded. Although, thinking about it, that would result in a
ClassNotFound exception, not NoSuchMethod. Have you built your code
against a different version of the library than the jar you use in
EC2?

Tobias

On Mon, Jun 9, 2014 at 1:52 PM, dlaw dieterich.law...@gmail.com wrote:
 I'm having some trouble getting a basic matrix multiply to work with Breeze.
 I'm pretty sure it's related to my classpath. My setup is a cluster on AWS
 with 8 m3.xlarges. To create the cluster I used the provided ec2 scripts and
 Spark 1.0.0.

 I've made a gist with the relevant pieces of my app:

 https://gist.github.com/dieterichlawson/e5e3ab158a09429706e0

 The app was created as detailed in the quick start guide.

 When I run it I get an error that says the method to multiply a dense matrix
 by a dense matrix does not exist:

 14/06/09 04:49:09 WARN scheduler.TaskSetManager: Lost TID 90 (task 0.0:13)
 14/06/09 04:49:09 INFO scheduler.TaskSetManager: Loss was due to
 java.lang.NoSuchMethodError:
 breeze.linalg.DenseMatrix$.implOpMulMatrix_DMD_DMD_eq_DMD()Lbreeze/linalg/operators/DenseMatrixMultiplyStuff$implOpMulMatrix_DMD_DMD_eq_DMD$;
 [duplicate 46]

 I've tried a bunch of different things, including playing with the CLASSPATH
 and ADD_JARS environment variables, the --jars option on spark-submit, the
 version of breeze and scala, etc...

 I've also tried it in the spark-shell. It works there, so I don't really
 know what's going on. Any thoughts?






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-errors-with-Breeze-tp7220.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark 1.0.0 Maven dependencies problems.

2014-06-09 Thread Tobias Pfeiffer
Hi,

I remembered I saw this as well and found this ugly comment in my
build.sbt file:


On Mon, Jun 9, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote:
 Looks like this crept in again from the shaded Akka dependency. I'll
 propose a PR to remove it. I believe that remains the way we have to
 deal with the differing Netty/Jetty dependencies floating around.

 On Mon, Jun 9, 2014 at 9:53 AM, toivoa toivo@gmail.com wrote:
 I am using Maven from Eclipse

 dependency:tree shows


 [INFO] +- org.apache.spark:spark-core_2.10:jar:1.0.0:compile
 [INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:runtime
 [INFO] |  +- org.apache.curator:curator-recipes:jar:2.4.0:compile
 [INFO] |  |  +- org.apache.curator:curator-framework:jar:2.4.0:compile
 [INFO] |  |  |  \- org.apache.curator:curator-client:jar:2.4.0:compile
 [INFO] |  |  \- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
 [INFO] |  | \- jline:jline:jar:0.9.94:compile
 [INFO] |  +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:compile
 [INFO] |  |  +-
 org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:compile
 [INFO] |  |  +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:compile
 [INFO] |  |  |  +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:compile
 [INFO] |  |  |  \-
 org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:compile
 [INFO] |  |  \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:compile
 [INFO] |  | \-
 org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:compile
 [INFO] |  |\-
 org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:compile
 [INFO] |  +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:compile
 [INFO] |  +- org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:compile
 [INFO] |  |  +-
 org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
 [INFO] |  |  +-
 org.eclipse.jetty:jetty-continuation:jar:8.1.14.v20131031:compile
 [INFO] |  |  \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:compile
 [INFO] |  | \- org.eclipse.jetty:jetty-io:jar:8.1.14.v20131031:compile
 [INFO] |  +- com.google.guava:guava:jar:14.0.1:compile
 [INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
 [INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
 [INFO] |  +- org.slf4j:slf4j-api:jar:1.7.5:compile
 [INFO] |  +- org.slf4j:jul-to-slf4j:jar:1.7.5:compile
 [INFO] |  +- org.slf4j:jcl-over-slf4j:jar:1.7.5:compile
 [INFO] |  +- log4j:log4j:jar:1.2.17:compile
 [INFO] |  +- org.slf4j:slf4j-log4j12:jar:1.7.5:compile
 [INFO] |  +- com.ning:compress-lzf:jar:1.0.0:compile
 [INFO] |  +- org.xerial.snappy:snappy-java:jar:1.0.5:compile
 [INFO] |  +- com.twitter:chill_2.10:jar:0.3.6:compile
 [INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
 [INFO] |  | +-
 com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
 [INFO] |  | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
 [INFO] |  | \- org.objenesis:objenesis:jar:1.2:compile
 [INFO] |  +- com.twitter:chill-java:jar:0.3.6:compile
 [INFO] |  +- commons-net:commons-net:jar:2.2:compile
 [INFO] |  +-
 org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf:compile
 [INFO] |  |  +-
 org.spark-project.akka:akka-actor_2.10:jar:2.2.3-shaded-protobuf:compile
 [INFO] |  |  |  \- com.typesafe:config:jar:1.0.2:compile
 [INFO] |  |  +- io.netty:netty:jar:3.6.6.Final:compile
 [INFO] |  |  +-
 org.spark-project.protobuf:protobuf-java:jar:2.4.1-shaded:compile
 [INFO] |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
 [INFO] |  +-
 org.spark-project.akka:akka-slf4j_2.10:jar:2.2.3-shaded-protobuf:compile
 [INFO] |  +- org.scala-lang:scala-library:jar:2.10.4:compile
 [INFO] |  +- org.json4s:json4s-jackson_2.10:jar:3.2.6:compile
 [INFO] |  |  +- org.json4s:json4s-core_2.10:jar:3.2.6:compile
 [INFO] |  |  |  +- org.json4s:json4s-ast_2.10:jar:3.2.6:compile
 [INFO] |  |  |  +- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
 [INFO] |  |  |  \- org.scala-lang:scalap:jar:2.10.0:compile
 [INFO] |  |  | \- org.scala-lang:scala-compiler:jar:2.10.0:compile
 [INFO] |  |  |\- org.scala-lang:scala-reflect:jar:2.10.0:compile
 [INFO] |  |  \-
 com.fasterxml.jackson.core:jackson-databind:jar:2.3.0:compile
 [INFO] |  | +-
 com.fasterxml.jackson.core:jackson-annotations:jar:2.3.0:compile
 [INFO] |  | \- com.fasterxml.jackson.core:jackson-core:jar:2.3.0:compile
 [INFO] |  +- colt:colt:jar:1.2.0:compile
 [INFO] |  |  \- concurrent:concurrent:jar:1.3.4:compile
 [INFO] |  +- org.apache.mesos:mesos:jar:shaded-protobuf:0.18.1:compile
 [INFO] |  +- io.netty:netty-all:jar:4.0.17.Final:compile
 [INFO] |  +- com.clearspring.analytics:stream:jar:2.5.1:compile
 [INFO] |  +- com.codahale.metrics:metrics-core:jar:3.0.0:compile
 [INFO] |  +- com.codahale.metrics:metrics-jvm:jar:3.0.0:compile
 [INFO] |  +- com.codahale.metrics:metrics-json:jar:3.0.0:compile
 [INFO] |  +- com.codahale.metrics:metrics-graphite:jar:3.0.0:compile
 [INFO] |  +- 

Re: Spark 1.0.0 Maven dependencies problems.

2014-06-09 Thread Tobias Pfeiffer
Hi,

(Apparently Google Mail is quite eager to send out mails when
Ctrl+Enter is hit by accident. Sorry for the previous email.)

I remembered I saw this as well and found this ugly comment in my
build.sbt file:
/*
 * [...], there is still a problem with some classes
 * in javax.servlet (from org.eclipse.jetty.orbit) being signed, while others
 * (from org.mortbay.jetty) are not. When using sbt run, this is not a
 * problem, but IntelliJ IDEA refuses to run the main classes, raising a
 * SecurityException. In order to deal with that, remove the signature from
 * 
org.eclipse.jetty.orbit/javax.servlet/jars/javax.servlet-3.0.0.v201112011016.jar
 * file manually.
 */

Tobias

On Tue, Jun 10, 2014 at 10:47 AM, Tobias Pfeiffer t...@preferred.jp wrote:
 Hi,

 I remembered I saw this as well and found this ugly comment in my
 build.sbt file:


 On Mon, Jun 9, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote:
 Looks like this crept in again from the shaded Akka dependency. I'll
 propose a PR to remove it. I believe that remains the way we have to
 deal with the differing Netty/Jetty dependencies floating around.

 On Mon, Jun 9, 2014 at 9:53 AM, toivoa toivo@gmail.com wrote:
 I am using Maven from Eclipse

 dependency:tree shows


 [INFO] +- org.apache.spark:spark-core_2.10:jar:1.0.0:compile
 [INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:runtime
 [INFO] |  +- org.apache.curator:curator-recipes:jar:2.4.0:compile
 [INFO] |  |  +- org.apache.curator:curator-framework:jar:2.4.0:compile
 [INFO] |  |  |  \- org.apache.curator:curator-client:jar:2.4.0:compile
 [INFO] |  |  \- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
 [INFO] |  | \- jline:jline:jar:0.9.94:compile
 [INFO] |  +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:compile
 [INFO] |  |  +-
 org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v201105210645:compile
 [INFO] |  |  +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:compile
 [INFO] |  |  |  +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:compile
 [INFO] |  |  |  \-
 org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:compile
 [INFO] |  |  \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:compile
 [INFO] |  | \-
 org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v201005082020:compile
 [INFO] |  |\-
 org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105071233:compile
 [INFO] |  +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:compile
 [INFO] |  +- org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:compile
 [INFO] |  |  +-
 org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
 [INFO] |  |  +-
 org.eclipse.jetty:jetty-continuation:jar:8.1.14.v20131031:compile
 [INFO] |  |  \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:compile
 [INFO] |  | \- org.eclipse.jetty:jetty-io:jar:8.1.14.v20131031:compile
 [INFO] |  +- com.google.guava:guava:jar:14.0.1:compile
 [INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
 [INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
 [INFO] |  +- org.slf4j:slf4j-api:jar:1.7.5:compile
 [INFO] |  +- org.slf4j:jul-to-slf4j:jar:1.7.5:compile
 [INFO] |  +- org.slf4j:jcl-over-slf4j:jar:1.7.5:compile
 [INFO] |  +- log4j:log4j:jar:1.2.17:compile
 [INFO] |  +- org.slf4j:slf4j-log4j12:jar:1.7.5:compile
 [INFO] |  +- com.ning:compress-lzf:jar:1.0.0:compile
 [INFO] |  +- org.xerial.snappy:snappy-java:jar:1.0.5:compile
 [INFO] |  +- com.twitter:chill_2.10:jar:0.3.6:compile
 [INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
 [INFO] |  | +-
 com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
 [INFO] |  | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
 [INFO] |  | \- org.objenesis:objenesis:jar:1.2:compile
 [INFO] |  +- com.twitter:chill-java:jar:0.3.6:compile
 [INFO] |  +- commons-net:commons-net:jar:2.2:compile
 [INFO] |  +-
 org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf:compile
 [INFO] |  |  +-
 org.spark-project.akka:akka-actor_2.10:jar:2.2.3-shaded-protobuf:compile
 [INFO] |  |  |  \- com.typesafe:config:jar:1.0.2:compile
 [INFO] |  |  +- io.netty:netty:jar:3.6.6.Final:compile
 [INFO] |  |  +-
 org.spark-project.protobuf:protobuf-java:jar:2.4.1-shaded:compile
 [INFO] |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
 [INFO] |  +-
 org.spark-project.akka:akka-slf4j_2.10:jar:2.2.3-shaded-protobuf:compile
 [INFO] |  +- org.scala-lang:scala-library:jar:2.10.4:compile
 [INFO] |  +- org.json4s:json4s-jackson_2.10:jar:3.2.6:compile
 [INFO] |  |  +- org.json4s:json4s-core_2.10:jar:3.2.6:compile
 [INFO] |  |  |  +- org.json4s:json4s-ast_2.10:jar:3.2.6:compile
 [INFO] |  |  |  +- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
 [INFO] |  |  |  \- org.scala-lang:scalap:jar:2.10.0:compile
 [INFO] |  |  | \- org.scala-lang:scala-compiler:jar:2.10.0:compile
 [INFO] |  |  |\- org.scala-lang:scala-reflect:jar:2.10.0:compile
 [INFO] |  |  \-
 com.fasterxml.jackson.core:jackson-databind:jar

Re: Kafka client - specify offsets?

2014-06-16 Thread Tobias Pfeiffer
Hi,

there are apparently helpers to tell you the offsets
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example#id-0.8.0SimpleConsumerExample-FindingStartingOffsetforReads,
but I have no idea how to pass that to the Kafka stream consumer. I am
interested in that as well.

Tobias

On Thu, Jun 12, 2014 at 5:53 AM, Michael Campbell
michael.campb...@gmail.com wrote:
 Is there a way in the Apache Spark Kafka Utils to specify an offset to start
 reading?  Specifically, from the start of the queue, or failing that, a
 specific point?


Re: Spark SQL: No function to evaluate expression

2014-06-17 Thread Tobias Pfeiffer
The error message *means* that there is no column called c_address.
However, maybe it's a bug with Spark SQL not understanding the
a.c_address syntax. Can you double-check the column name is correct?

Thanks
Tobias

On Wed, Jun 18, 2014 at 5:02 AM, Zuhair Khayyat
zuhair.khay...@gmail.com wrote:
 Dear all,

 I am trying to run the following query on Spark SQL using some custom TPC-H
 tables with standalone Spark cluster configuration:

 SELECT * FROM history a JOIN history b ON a.o_custkey = b.o_custkey WHERE
 a.c_address  b.c_address;

 Unfortunately I get the following error during execution:

 java.lang.reflect.InvocationTargetException

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)

 at
 org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)

 Caused by: org.apache.spark.SparkException: Job aborted due to stage
 failure: Task 0.0:2 failed 4 times, most recent failure: Exception failure
 in TID 12 on host kw2260.kaust.edu.sa:
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function
 to evaluate expression. type: UnresolvedAttribute, tree: 'a.c_address


 org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.eval(unresolved.scala:59)


 org.apache.spark.sql.catalyst.expressions.Equals.eval(predicates.scala:147)


 org.apache.spark.sql.catalyst.expressions.Not.eval(predicates.scala:74)


 org.apache.spark.sql.catalyst.expressions.And.eval(predicates.scala:100)


 Is this a bug or am I doing something wrong?


 Regards,

 Zuhair Khayyat


Re: Kafka client - specify offsets?

2014-06-24 Thread Tobias Pfeiffer
Michael,

apparently, the parameter auto.offset.reset has a different meaning
in Spark's Kafka implementation than what is described in the
documentation.

The Kafka docs at https://kafka.apache.org/documentation.html
specify the effect of auto.offset.reset as:
 What to do when there is no initial offset in ZooKeeper or if an offset is 
 out of range:
 * smallest : automatically reset the offset to the smallest offset
 * largest : automatically reset the offset to the largest offset
 * anything else: throw exception to the consumer

However, Spark's implementation seems to drop the part when there is
no initial offset, as can be seen in
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala#L102
-- it will just wipe the stored offset from Zookeeper. I guess it's
actually a bug, because the parameter's effect is different than what
is documented, but then it's good for you (and me) because it allows
to specify I want all that I can get or I want to start reading
right now, even if there is an offset stored in Zookeeper.

Tobias

On Sun, Jun 15, 2014 at 11:27 PM, Tobias Pfeiffer t...@preferred.jp wrote:
 Hi,

 there are apparently helpers to tell you the offsets
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example#id-0.8.0SimpleConsumerExample-FindingStartingOffsetforReads,
 but I have no idea how to pass that to the Kafka stream consumer. I am
 interested in that as well.

 Tobias

 On Thu, Jun 12, 2014 at 5:53 AM, Michael Campbell
 michael.campb...@gmail.com wrote:
 Is there a way in the Apache Spark Kafka Utils to specify an offset to start
 reading?  Specifically, from the start of the queue, or failing that, a
 specific point?


Re: Changing log level of spark

2014-06-25 Thread Tobias Pfeiffer
I have a log4j.xml in src/main/resources with

?xml version=1.0 encoding=UTF-8 ?
!DOCTYPE log4j:configuration SYSTEM log4j.dtd
log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/;
[...]
root
priority value =warn /
appender-ref ref=Console /
/root
/log4j:configuration

and that is included in the jar I package with `sbt assembly`. That
works fine for me, at least on the driver.

Tobias

On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com wrote:
 Hi!

 According to
 https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging,
 changing log-level is just a matter of creating a log4j.properties (which is
 in the classpath of spark) and changing log level there for the root logger.
 I did this steps on every node in the cluster (master and worker nodes).
 However, after restart there is still no debug output as desired, but only
 the default info log level.


Distribute data from Kafka evenly on cluster

2014-06-27 Thread Tobias Pfeiffer
Hi,

I have a number of questions using the Kafka receiver of Spark
Streaming. Maybe someone has some more experience with that and can
help me out.

I have set up an environment for getting to know Spark, consisting of
- a Mesos cluster with 3 only-slaves and 3 master-and-slaves,
- 2 Kafka nodes,
- 3 Zookeeper nodes providing service to both Kafka and Mesos.

My Kafka cluster has only one topic with one partition (replicated to
both nodes). When I start my Kafka receiver, it successfully connects
to Kafka and does the processing, but it seems as if the (expensive)
function in the final foreachRDD(...) is only executed on one node of
my cluster, which is not what I had in mind when setting up the
cluster ;-)

So first, I was wondering about the parameter `topics: Map[String,
Int]` to KafkaUtils.createStream(). Apparently it controls how many
connections are made from my cluster nodes to Kafka. The Kafka doc at
https://kafka.apache.org/documentation.html#introduction says each
message published to a topic is delivered to one consumer instance
within each subscribing consumer group and If all the consumer
instances have the same consumer group, then this works just like a
traditional queue balancing load over the consumers.

The Kafka docs *also* say: Note however that there cannot be more
consumer instances than partitions. This seems to imply that with
only one partition, increasing the number in my Map should have no
effect.

However, if I increase the number of streams for my one topic in my
`topics` Map, I actually *do* see that the task in my foreachRDD(...)
call is now executed on multiple nodes. Maybe it's more of a Kafka
question than a Spark one, but can anyone explain this to me? Should I
always have more Kafka partitions than Mesos cluster nodes?

So, assuming that changing the number in that Map is not what I want
(although I don't know if it is), I tried to use
.repartition(numOfClusterNodes) (which doesn't seem right if I want to
add and remove Mesos nodes on demand). This *also* did spread the
foreachRDD(...) action evenly – however, the function never seems to
terminate, so I never get to process the next interval in the stream.
A similar behavior can be observed when running locally, not on the
cluster, then the program will not exit but instead hang after
everything else has shut down. Any hints concerning this issue?

Thanks
Tobias


Re: Could not compute split, block not found

2014-06-30 Thread Tobias Pfeiffer
Bill,

let's say the processing time is t' and the window size t. Spark does not
*require* t'  t. In fact, for *temporary* peaks in your streaming data, I
think the way Spark handles it is very nice, in particular since 1) it does
not mix up the order in which items arrived in the stream, so items from a
later window will always be processed later, and 2) because an increase in
data will not be punished with high load and unresponsive systems, but with
disk space consumption instead.

However, if all of your windows require t'  t processing time (and it's
not because you are waiting, but because you actually do some computation),
then you are in bad luck, because if you start processing the next window
while the previous one is still processed, you have less resources for each
and processing will take even longer. However, if you are only waiting
(e.g., for network I/O), then maybe you can employ some asynchronous
solution where your tasks return immediately and deliver their result via a
callback later?

Tobias



On Tue, Jul 1, 2014 at 2:26 AM, Bill Jay bill.jaypeter...@gmail.com wrote:

 Tobias,

 Your suggestion is very helpful. I will definitely investigate it.

 Just curious. Suppose the batch size is t seconds. In practice, does Spark
 always require the program to finish processing the data of t seconds
 within t seconds' processing time? Can Spark begin to consume the new batch
 before finishing processing the next batch? If Spark can do them together,
 it may save the processing time and solve the problem of data piling up.

 Thanks!

 Bill




 On Mon, Jun 30, 2014 at 4:49 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 ​​If your batch size is one minute and it takes more than one minute to
 process, then I guess that's what causes your problem. The processing of
 the second batch will not start after the processing of the first is
 finished, which leads to more and more data being stored and waiting for
 processing; check the attached graph for a visualization of what I think
 may happen.

 Can you maybe do something hacky like throwing away a part of the data so
 that processing time gets below one minute, then check whether you still
 get that error?

 Tobias


 ​​


 On Mon, Jun 30, 2014 at 1:56 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Tobias,

 Thanks for your help. I think in my case, the batch size is 1 minute.
 However, it takes my program more than 1 minute to process 1 minute's
 data. I am not sure whether it is because the unprocessed data pile up.
 Do you have an suggestion on how to check it and solve it? Thanks!

 Bill


 On Sun, Jun 29, 2014 at 7:18 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 were you able to process all information in time, or did maybe some
 unprocessed data pile up? I think when I saw this once, the reason
 seemed to be that I had received more data than would fit in memory,
 while waiting for processing, so old data was deleted. When it was
 time to process that data, it didn't exist any more. Is that a
 possible reason in your case?

 Tobias

 On Sat, Jun 28, 2014 at 5:59 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:
  Hi,
 
  I am running a spark streaming job with 1 minute as the batch size.
 It ran
  around 84 minutes and was killed because of the exception with the
 following
  information:
 
  java.lang.Exception: Could not compute split, block
 input-0-1403893740400
  not found
 
 
  Before it was killed, it was able to correctly generate output for
 each
  batch.
 
  Any help on this will be greatly appreciated.
 
  Bill
 







Re: spark streaming rate limiting from kafka

2014-07-01 Thread Tobias Pfeiffer
Hi,

On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


Please see the post at
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
Kafka's auto.offset.reset parameter may be what you are looking for.

Tobias


Visualize task distribution in cluster

2014-07-02 Thread Tobias Pfeiffer
Hi,

I am using Mesos to run my Spark tasks. I would be interested to see how
Spark distributes the tasks in the cluster (nodes, partitions) and which
nodes are more or less active and do what kind of tasks, and how long the
transfer of data and jobs takes. Is there any way to get this information
from Spark?

Thanks
Tobias


Re: Kafka - streaming from multiple topics

2014-07-03 Thread Tobias Pfeiffer
Sergey,


On Fri, Jul 4, 2014 at 1:06 AM, Sergey Malov sma...@collective.com wrote:

 On the other hand, under the hood KafkaInputDStream which is create with
 this KafkaUtils call,  calls ConsumerConnector.createMessageStream which
 returns a Map[String, List[KafkaStream] keyed by topic. It is, however, not
 exposed.


I wonder if this is a bug. After all, KafkaUtils.createStream() returns a
DStream[(String, String)], which pretty much looks like it should be a
(topic - message) mapping. However, for me, the key is always null. Maybe
you could consider filing a bug/wishlist report?

Tobias


Re: Distribute data from Kafka evenly on cluster

2014-07-04 Thread Tobias Pfeiffer
Hi,

unfortunately, when I go the above approach, I run into this problem:

http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E
That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver
will retry again and again, but will eventually fail, leading to
unprocessed data and, worse, the task never terminating. There is nothing
exotic about my setup; one Zookeeper node, one Kafka broker, so I am
wondering if other people have seen this error before and, more important,
how to fix it. When I don't use the approach of multiple kafkaStreams, I
don't get this error, but also work is never distributed in my cluster...

Thanks
Tobias


On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Thank you very much for the link, that was very helpful!

 So, apparently the `topics: Map[String, Int]` parameter controls the
 number of partitions that the data is initially added to; the number N in

   val kafkaInputs = (1 to N).map { _ =
 ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1))
   }
   val union = ssc.union(kafkaInputs)

 controls how many connections are made to Kafka. Note that the number of
 Kafka partitions for that topic must be at least N for this to work.

 Thanks
 Tobias



Re: Which is the best way to get a connection to an external database per task in Spark Streaming?

2014-07-07 Thread Tobias Pfeiffer
Juan,

I am doing something similar, just not insert into SQL database, but
issue some RPC call. I think mapPartitions() may be helpful to you. You
could do something like

dstream.mapPartitions(iter = {
  val db = new DbConnection()
  // maybe only do the above if !iter.isEmpty
  iter.map(item = {
db.call(...)
// do some cleanup if !iter.hasNext here
item
  })
}).count() // force output

Keep in mind though that the whole idea about RDDs is that operations are
idempotent and in theory could be run on multiple hosts (to take the result
from the fastest server) or multiple times (to deal with failures/timeouts)
etc., which is maybe something you want to deal with in your SQL.

Tobias



On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com wrote:

 Hi list,

 I'm writing a Spark Streaming program that reads from a kafka topic,
 performs some transformations on the data, and then inserts each record in
 a database with foreachRDD. I was wondering which is the best way to handle
 the connection to the database so each worker, or even each task, uses a
 different connection to the database, and then database inserts/updates
 would be performed in parallel.
 - I understand that using a final variable in the driver code is not a
 good idea because then the communication with the database would be
 performed in the driver code, which leads to a bottleneck, according to
 http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/
 - I think creating a new connection in the call() method of the Function
 passed to foreachRDD is also a bad idea, because then I wouldn't be reusing
 the connection to the database for each batch RDD in the DStream
 - I'm not sure that a broadcast variable with the connection handler is a
 good idea in case the target database is distributed, because if the same
 handler is used for all the nodes of the Spark cluster then than could have
 a negative effect in the data locality of the connection to the database.
 - From
 http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html
 I understand that by using an static variable and referencing it in the
 call() method of the Function passed to foreachRDD we get a different
 connection per Spark worker, I guess it's because there is a different JVM
 per worker. But then all the tasks in the same worker would share the same
 database handler object, am I right?
 - Another idea is using updateStateByKey() using the database handler as
 the state, but I guess that would only work for Serializable database
 handlers, and for example not for an org.apache.hadoop.hbase.client.HTable
 object.

 So my question is, which is the best way to get a connection to an
 external database per task in Spark Streaming? Or at least per worker. In
 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html
 there is a partial solution to this question, but there the database
 handler object is missing. This other question
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html
 is closer to mine, but there is no answer for it yet

 Thanks in advance,

 Greetings,

 Juan




Re: Spark-streaming-kafka error

2014-07-08 Thread Tobias Pfeiffer
Bill,

have you packaged org.apache.spark % spark-streaming-kafka_2.10 %
1.0.0 into your application jar? If I remember correctly, it's not
bundled with the downloadable compiled version of Spark.

Tobias


On Wed, Jul 9, 2014 at 8:18 AM, Bill Jay bill.jaypeter...@gmail.com wrote:

 Hi all,

 I used sbt to package a code that uses spark-streaming-kafka. The
 packaging succeeded. However, when I submitted to yarn, the job ran for 10
 seconds and there was an error in the log file as follows:

 Caused by: java.lang.NoClassDefFoundError:
 org/apache/spark/streaming/kafka/KafkaUtils$


 Does anyone know the reason for this? Thanks!


 Bill



Re: Use Spark Streaming to update result whenever data come

2014-07-08 Thread Tobias Pfeiffer
Bill,

do the additional 100 nodes receive any tasks at all? (I don't know which
cluster you use, but with Mesos you could check client logs in the web
interface.) You might want to try something like repartition(N) or
repartition(N*2) (with N the number of your nodes) after you receive your
data.

Tobias


On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com wrote:

 Hi Tobias,

 Thanks for the suggestion. I have tried to add more nodes from 300 to 400.
 It seems the running time did not get improved.


 On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 can't you just add more nodes in order to speed up the processing?

 Tobias


 On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a problem of using Spark Streaming to accept input data and
 update a result.

 The input of the data is from Kafka and the output is to report a map
 which is updated by historical data in every minute. My current method is
 to set batch size as 1 minute and use foreachRDD to update this map and
 output the map at the end of the foreachRDD function. However, the current
 issue is the processing cannot be finished within one minute.

 I am thinking of updating the map whenever the new data come instead of
 doing the update when the whoe RDD comes. Is there any idea on how to
 achieve this in a better running time? Thanks!

 Bill






Re: Use Spark Streaming to update result whenever data come

2014-07-09 Thread Tobias Pfeiffer
Bill,

I haven't worked with Yarn, but I would try adding a repartition() call
after you receive your data from Kafka. I would be surprised if that didn't
help.


On Thu, Jul 10, 2014 at 6:23 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tobias,

 I was using Spark 0.9 before and the master I used was yarn-standalone. In
 Spark 1.0, the master will be either yarn-cluster or yarn-client. I am not
 sure whether it is the reason why more machines do not provide better
 scalability. What is the difference between these two modes in terms of
 efficiency? Thanks!


 On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 do the additional 100 nodes receive any tasks at all? (I don't know which
 cluster you use, but with Mesos you could check client logs in the web
 interface.) You might want to try something like repartition(N) or
 repartition(N*2) (with N the number of your nodes) after you receive your
 data.

 Tobias


 On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 Thanks for the suggestion. I have tried to add more nodes from 300 to
 400. It seems the running time did not get improved.


 On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 can't you just add more nodes in order to speed up the processing?

 Tobias


 On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a problem of using Spark Streaming to accept input data and
 update a result.

 The input of the data is from Kafka and the output is to report a map
 which is updated by historical data in every minute. My current method is
 to set batch size as 1 minute and use foreachRDD to update this map and
 output the map at the end of the foreachRDD function. However, the current
 issue is the processing cannot be finished within one minute.

 I am thinking of updating the map whenever the new data come instead
 of doing the update when the whoe RDD comes. Is there any idea on how to
 achieve this in a better running time? Thanks!

 Bill








Re: Some question about SQL and streaming

2014-07-09 Thread Tobias Pfeiffer
Siyuan,

I do it like this:

// get data from Kafka
val ssc = new StreamingContext(...)
val kvPairs = KafkaUtils.createStream(...)
// we need to wrap the data in a case class for registerAsTable() to succeed
val lines = kvPairs.map(_._2).map(s = StringWrapper(s))
val result = lines.transform((rdd, time) = {
  // execute statement
  rdd.registerAsTable(data)
  sqlc.sql(query)
})

Don't know if it is the best way, but it works.

Tobias


On Thu, Jul 10, 2014 at 4:21 AM, hsy...@gmail.com hsy...@gmail.com wrote:

 Hi guys,

 I'm a new user to spark. I would like to know is there an example of how
 to user spark SQL and spark streaming together? My use case is I want to do
 some SQL on the input stream from kafka.
 Thanks!

 Best,
 Siyuan



Re: Use Spark Streaming to update result whenever data come

2014-07-09 Thread Tobias Pfeiffer
Bill,

good to know you found your bottleneck. Unfortunately, I don't know how to
solve this; until know, I have used Spark only with embarassingly parallel
operations such as map or filter. I hope someone else might provide more
insight here.

Tobias


On Thu, Jul 10, 2014 at 9:57 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tobias,

 Now I did the re-partition and ran the program again. I find a bottleneck
 of the whole program. In the streaming, there is a stage marked as 
 *combineByKey
 at ShuffledDStream.scala:42 *in spark UI. This stage is repeatedly
 executed. However, during some batches, the number of executors allocated
 to this step is only 2 although I used 300 workers and specified the
 partition number as 300. In this case, the program is very slow although
 the data that are processed are not big.

 Do you know how to solve this issue?

 Thanks!


 On Wed, Jul 9, 2014 at 5:51 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 I haven't worked with Yarn, but I would try adding a repartition() call
 after you receive your data from Kafka. I would be surprised if that didn't
 help.


 On Thu, Jul 10, 2014 at 6:23 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 I was using Spark 0.9 before and the master I used was yarn-standalone.
 In Spark 1.0, the master will be either yarn-cluster or yarn-client. I am
 not sure whether it is the reason why more machines do not provide better
 scalability. What is the difference between these two modes in terms of
 efficiency? Thanks!


 On Tue, Jul 8, 2014 at 5:26 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 do the additional 100 nodes receive any tasks at all? (I don't know
 which cluster you use, but with Mesos you could check client logs in the
 web interface.) You might want to try something like repartition(N) or
 repartition(N*2) (with N the number of your nodes) after you receive your
 data.

 Tobias


 On Wed, Jul 9, 2014 at 3:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tobias,

 Thanks for the suggestion. I have tried to add more nodes from 300 to
 400. It seems the running time did not get improved.


 On Wed, Jul 2, 2014 at 6:47 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 can't you just add more nodes in order to speed up the processing?

 Tobias


 On Thu, Jul 3, 2014 at 7:09 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a problem of using Spark Streaming to accept input data and
 update a result.

 The input of the data is from Kafka and the output is to report a
 map which is updated by historical data in every minute. My current 
 method
 is to set batch size as 1 minute and use foreachRDD to update this map 
 and
 output the map at the end of the foreachRDD function. However, the 
 current
 issue is the processing cannot be finished within one minute.

 I am thinking of updating the map whenever the new data come instead
 of doing the update when the whoe RDD comes. Is there any idea on how to
 achieve this in a better running time? Thanks!

 Bill










Re: Some question about SQL and streaming

2014-07-10 Thread Tobias Pfeiffer
Hi,

I think it would be great if we could do the string parsing only once and
then just apply the transformation for each interval (reducing the
processing overhead for short intervals).

Also, one issue with the approach above is that transform() has the
following signature:

  def transform(transformFunc: RDD[T] = RDD[U]): DStream[U]

and therefore, in my example

val result = lines.transform((rdd, time) = {
   // execute statement
   rdd.registerAsTable(data)
   sqlc.sql(query)
 })


the variable `result ` is of type DStream[Row]. That is, the
meta-information from the SchemaRDD is lost and, from what I understand,
there is then no way to learn about the column names of the returned data,
as this information is only encoded in the SchemaRDD. I would love to see a
fix for this.

Thanks
Tobias


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-13 Thread Tobias Pfeiffer
Hi,

I experienced exactly the same problems when using SparkContext with
local[1] master specification, because in that case one thread is used
for receiving data, the others for processing. As there is only one thread
running, no processing will take place. Once you shut down the connection,
the receiver thread will be used for processing.

Any chance you run into the same issue?

Tobias



On Mon, Jul 14, 2014 at 11:45 AM, kytay kaiyang@gmail.com wrote:

 Hi Akhil Das

 Thanks.

 I tried the codes. and it works.

 There's a problem with my socket codes that is not flushing the content
 out,
 and for the test tool, Hercules, I have to close the socket connection to
 flush the content out.

 I am going to troubleshoot why nc works, and the codes and test tool don't.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Announcing Spark 1.0.1

2014-07-14 Thread Tobias Pfeiffer
Hi,

congratulations on the release! I'm always pleased to see how features pop
up in new Spark versions that I had added for myself in a very hackish way
before (such as JSON support for Spark SQL).

I am wondering if there is any good way to learn early about what is going
to be in upcoming versions, except than tracking JIRA...?

Tobias


On Tue, Jul 15, 2014 at 12:50 AM, Philip Ogren philip.og...@oracle.com
wrote:

 Hi Patrick,

 This is great news but I nearly missed the announcement because it had
 scrolled off the folder view that I have Spark users list messages go to.
  40+ new threads since you sent the email out on Friday evening.

 You might consider having someone on your team create a spark-announcement
 list so that it is easier to disseminate important information like this
 release announcement.

 Thanks again for all your hard work.  I know you and the rest of the team
 are getting a million requests a day

 Philip



 On 07/11/2014 07:35 PM, Patrick Wendell wrote:

 I am happy to announce the availability of Spark 1.0.1! This release
 includes contributions from 70 developers. Spark 1.0.0 includes fixes
 across several areas of Spark, including the core API, PySpark, and
 MLlib. It also includes new features in Spark's (alpha) SQL library,
 including support for JSON data and performance and stability fixes.

 Visit the release notes[1] to read about this release or download[2]
 the release today.

 [1] http://spark.apache.org/releases/spark-release-1-0-1.html
 [2] http://spark.apache.org/downloads.html





Re: can't print DStream after reduce

2014-07-15 Thread Tobias Pfeiffer
Hi,

thanks for creating the issue. It feels like in the last week, more or less
half of the questions about Spark Streaming rooted in setting the master to
local ;-)

Tobias


On Wed, Jul 16, 2014 at 11:03 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Aah, right, copied from the wrong browser tab i guess. Thanks!

 TD


 On Tue, Jul 15, 2014 at 5:57 PM, Michael Campbell 
 michael.campb...@gmail.com wrote:

 I think you typo'd the jira id; it should be
 https://issues.apache.org/jira/browse/SPARK-2475  Check whether #cores
  #receivers in local mode


 On Mon, Jul 14, 2014 at 3:57 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 The problem is not really for local[1] or local. The problem arises when
 there are more input streams than there are cores.
 But I agree, for people who are just beginning to use it by running it
 locally, there should be a check addressing this.

 I made a JIRA for this.
 https://issues.apache.org/jira/browse/SPARK-2464

 TD


 On Sun, Jul 13, 2014 at 4:26 PM, Sean Owen so...@cloudera.com wrote:

 How about a PR that rejects a context configured for local or local[1]?
 As I understand it is not intended to work and has bitten several people.
 On Jul 14, 2014 12:24 AM, Michael Campbell 
 michael.campb...@gmail.com wrote:

 This almost had me not using Spark; I couldn't get any output.  It is
 not at all obvious what's going on here to the layman (and to the best of
 my knowledge, not documented anywhere), but now you know you'll be able to
 answer this question for the numerous people that will also have it.


 On Sun, Jul 13, 2014 at 5:24 PM, Walrus theCat walrusthe...@gmail.com
  wrote:

 Great success!

 I was able to get output to the driver console by changing the
 construction of the Streaming Spark Context from:

  val ssc = new StreamingContext(local /**TODO change once a cluster
 is up **/,
 AppName, Seconds(1))


 to:

 val ssc = new StreamingContext(local[2] /**TODO change once a
 cluster is up **/,
 AppName, Seconds(1))


 I found something that tipped me off that this might work by digging
 through this mailing list.


 On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat 
 walrusthe...@gmail.com wrote:

 More strange behavior:

 lines.foreachRDD(x = println(x.first)) // works
 lines.foreachRDD(x = println((x.count,x.first))) // no output is
 printed to driver console




 On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat 
 walrusthe...@gmail.com wrote:


 Thanks for your interest.

 lines.foreachRDD(x = println(x.count))

  And I got 0 every once in a while (which I think is strange,
 because lines.print prints the input I'm giving it over the socket.)


 When I tried:

 lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count))

 I got no count.

 Thanks


 On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Try doing DStream.foreachRDD and then printing the RDD count and
 further inspecting the RDD.
  On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com
 wrote:

 Hi,

 I have a DStream that works just fine when I say:

 dstream.print

 If I say:

 dstream.map(_,1).print

 that works, too.  However, if I do the following:

 dstream.reduce{case(x,y) = x}.print

 I don't get anything on my console.  What's going on?

 Thanks











Re: Include permalinks in mail footer

2014-07-17 Thread Tobias Pfeiffer

 On Jul 17, 2014, at 12:59 PM, Nick Chammas nicholas.cham...@gmail.com
 wrote:

 I often find myself wanting to reference one thread from another, or from
 a JIRA issue. Right now I have to google the thread subject and find the
 link that way.


+1


Re: spark streaming rate limiting from kafka

2014-07-17 Thread Tobias Pfeiffer
Bill,

are you saying, after repartition(400), you have 400 partitions on one host
and the other hosts receive nothing of the data?

Tobias


On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song





Re: Distribute data from Kafka evenly on cluster

2014-07-18 Thread Tobias Pfeiffer
Hi,

as far as I know, rebalance is triggered from Kafka in order to distribute
partitions evenly. That is, to achieve the opposite of what you are seeing.
I think it would be interesting to check the Kafka logs for the result of
the rebalance operation and why you see what you are seeing. I know that in
the client logs it says which partitions of a topic were assigned to this
particular consumer, maybe you can have a look.

Tobias


On Fri, Jul 18, 2014 at 11:42 PM, Chen Song chen.song...@gmail.com wrote:

 Speaking of this, I have another related question.

 In my spark streaming job, I set up multiple consumers to receive data
 from Kafka, with each worker from one partition.

 Initially, Spark is intelligent enough to associate each worker to each
 partition, to make data consumption distributed. After running for a while,
 consumers rebalance themselves and some workers start reading partitions
 which were with others. This leads to a situation that some worker read
 from multiple partitions and some don't read at all. Because of data
 volume, this causes heap pressure on some workers.

 Any thoughts on why rebalance is triggered and how to monitor to avoid
 that?




 On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 unfortunately, when I go the above approach, I run into this problem:

 http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E
 That is, a NoNode error in Zookeeper when rebalancing. The Kafka
 receiver will retry again and again, but will eventually fail, leading to
 unprocessed data and, worse, the task never terminating. There is nothing
 exotic about my setup; one Zookeeper node, one Kafka broker, so I am
 wondering if other people have seen this error before and, more important,
 how to fix it. When I don't use the approach of multiple kafkaStreams, I
 don't get this error, but also work is never distributed in my cluster...

 Thanks
 Tobias


 On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Thank you very much for the link, that was very helpful!

 So, apparently the `topics: Map[String, Int]` parameter controls the
 number of partitions that the data is initially added to; the number N in

   val kafkaInputs = (1 to N).map { _ =
 ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1))
   }
   val union = ssc.union(kafkaInputs)

 controls how many connections are made to Kafka. Note that the number of
 Kafka partitions for that topic must be at least N for this to work.

 Thanks
 Tobias





 --
 Chen Song




Re: spark streaming rate limiting from kafka

2014-07-21 Thread Tobias Pfeiffer
Bill,

numPartitions means the number of Spark partitions that the data received
from Kafka will be split to. It has nothing to do with Kafka partitions, as
far as I know.

If you create multiple Kafka consumers, it doesn't seem like you can
specify which consumer will consume which Kafka partitions. Instead, Kafka
(at least with the interface that is exposed by the Spark Streaming API)
will do something called rebalance and assign Kafka partitions to consumers
evenly, you can see this in the client logs.

When using multiple Kafka consumers with auto.offset.reset = true, please
expect to run into this one:
https://issues.apache.org/jira/browse/SPARK-2383

Tobias


On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 I am currentlycreating multiple DStream to consumefrom different topics.
 How can I let each consumer consume from different partitions. I find the
 following parameters from Spark API:

 createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc:
 JavaStreamingContext
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html
 , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class[
 U], valueDecoderClass: Class[T], kafkaParams: Map[String, String],
 topics: Map[String, Integer],storageLevel: StorageLevel
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html
 ): JavaPairReceiverInputDStream
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html
 [K, V]

 Create an input stream that pulls messages form a Kafka Broker.




 The topics parameter is:
 *Map of (topic_name - numPartitions) to consume. Each partition is
 consumed in its own thread*

 Does numPartitions mean the total number of partitions to consume from
 topic_name or the index of the partition? How can we specify for each
 createStream which partition of the Kafka topic to consume? I think if so,
 I will get a lot of parallelism from the source of the data. Thanks!

 Bill

 On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 You can create multiple kafka stream to partition your topics across
 them, which will run multiple receivers or multiple executors. This is
 covered in the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song








Re: How to do an interactive Spark SQL

2014-07-22 Thread Tobias Pfeiffer
Hi,

as far as I know, after the Streaming Context has started, the processing
pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
statement is transformed into RDD operations when the Streaming Context
starts, I think there is no way to change the statement that is executed on
the current stream after the StreamingContext has started.

Tobias


On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com hsy...@gmail.com wrote:

 For example, this is what I tested and work on local mode, what it does is
 it get data and sql query both from kafka and do sql on each RDD and output
 the result back to kafka again
 I defined a var called *sqlS. * In the streaming part as you can see I
 change the sql statement if it consumes a sql message from kafka then next
 time when you do *sql(sqlS) *it execute the updated sql query.

 But this code doesn't work in cluster because sqlS is not updated on all
 the workers from what I understand.

 So my question is how do I change the sqlS value at runtime and make all
 the workers pick the latest value.


 *var sqlS = select count(*) from records*
 val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
 args
 val sparkConf = new SparkConf().setAppName(KafkaSpark)
 val sc = new SparkContext(sparkConf)
 val ssc = new StreamingContext(sc, Seconds(2))
 val sqlContext = new SQLContext(sc)

 // Importing the SQL context gives access to all the SQL functions and
 implicit conversions.
 import sqlContext._
 import sqlContext.createSchemaRDD

 //val tt = Time(5000)
 val topicpMap = collection.immutable.HashMap(topic - numParts.toInt,
 sqltopic - 2)
 val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
 topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS =
 t._2;* false } else true }).map(t = getRecord(t._2.split(#)))

 val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer)

 val brokerString =
 ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,)

 KafkaSpark.props.put(metadata.broker.list, brokerString)
 val config = new ProducerConfig(KafkaSpark.props)
 val producer = new Producer[String, String](config)

 val result = recordsStream.foreachRDD((recRDD) = {
   val schemaRDD = sqlContext.createSchemaRDD(recRDD)
   schemaRDD.registerAsTable(tName)
   val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = {
 s + r.mkString(,) + \n })
   producer.send(new KeyedMessage[String, String](outputTopic, sSQL:
 $sqlS \n $result))
 })
 ssc.start()
 ssc.awaitTermination()


 On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com
 wrote:

 Can you paste a small code example to illustrate your questions?

 On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:
  Sorry, typo. What I mean is sharing. If the sql is changing at runtime,
 how
  do I broadcast the sql to all workers that is doing sql analysis.
 
  Best,
  Siyuan
 
 
  On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com
 wrote:
 
  Do you mean that the texts of the SQL queries being hardcoded in the
  code? What do you mean by cannot shar the sql to all workers?
 
  On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com
  wrote:
   Hi guys,
  
   I'm able to run some Spark SQL example but the sql is static in the
   code. I
   would like to know is there a way to read sql from somewhere else
 (shell
   for
   example)
  
   I could read sql statement from kafka/zookeeper, but I cannot share
 the
   sql
   to all workers. broadcast seems not working for updating values.
  
   Moreover if I use some non-serializable class(DataInputStream etc) to
   read
   sql from other source, I always get Task not serializable:
   java.io.NotSerializableException
  
  
   Best,
   Siyuan
 
 





Re: Get Spark Streaming timestamp

2014-07-23 Thread Tobias Pfeiffer
Bill,

Spark Streaming's DStream provides overloaded methods for transform() and
foreachRDD() that allow you to access the timestamp of a batch:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream

I think the timestamp is the end of the batch, not the beginning. For
example, I compute runtime taking the difference between now() and the time
I get as a parameter in foreachRDD().

Tobias



On Thu, Jul 24, 2014 at 6:39 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi all,

 I have a question regarding Spark streaming. When we use the
 saveAsTextFiles function and my batch is 60 seconds, Spark will generate a
 series of files such as:

 result-140614896, result-140614802, result-140614808, etc.

 I think this is the timestamp for the beginning of each batch. How can we
 extract the variable and use it in our code? Thanks!

 Bill



Re: Spark as a application library vs infra

2014-07-27 Thread Tobias Pfeiffer
Mayur,

I don't know if I exactly understand the context of what you are asking,
but let me just mention issues I had with deploying.

* As my application is a streaming application, it doesn't read any files
from disk, so therefore I have no Hadoop/HDFS in place and I there is no
need for it, either. There should be no dependency on Hadoop or HDFS, since
you can perfectly run Spark applications without it.
* I use Mesos and so far I always had the downloaded Spark distribution
accessible for all machines (e.g., via HTTP) and then added my application
code by uploading a jar built with `sbt assembly`. As the Spark code itself
must not be contained in that jar file, I had to add '% provided' in the
sbt file, which in turn prevented me from running the application locally
from IntelliJ IDEA (it would not find the libraries marked with
provided), I always had to use `sbt run`.
* When using Mesos, on the Spark slaves the Spark jar is loaded before the
application jar, and so the log4j file from the Spark jar is used instead
of my custom one (that is different when running locally), so I had to edit
that file in the Spark distribution jar to customize logging of my Spark
nodes.

I wonder if the two latter problems would vanish if the Spark libraries
were bundled together with the application. (That would be your approach
#1, I guess.)

Tobias


Re: How true is this about spark streaming?

2014-07-29 Thread Tobias Pfeiffer
Hi,

that quoted statement doesn't make too much sense for me, either. Maybe if
you had a link for us that shows the context (Google doesn't reveal
anything but this conversation), we could evaluate that statement better.

Tobias


On Tue, Jul 29, 2014 at 5:53 PM, Sean Owen so...@cloudera.com wrote:

 I'm not sure I understand this, maybe because the context is missing.
 An RDD is immutable, so there is no such thing as writing to an RDD.
 I'm not sure which aspect is being referred to as single-threaded. Is
 this the Spark Streaming driver?

 What is the difference between streaming into Spark and reading
 from the stream? Streaming data into Spark means Spark reads the
 stream.

 A mini batch of data is exposed as an RDD, but the stream processing
 continues while it is operated on. Saving the RDDs is one of the most
 basic operations exposed by streaming:

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations
  No, you do not stop the stream processing to persist it. In fact you
 couldn't.

 On that basis, no, this sounds fairly wrong.

 On Tue, Jul 29, 2014 at 1:37 AM, Rohit Pujari rpuj...@hortonworks.com
 wrote:
  Hello folks:
 
  I came across a thread that said
 
  A Spark RDD read/write access is driven by a context object and is
 single
  threaded.  You cannot stream into Spark and read from the stream at the
 same
  time.  You have to stop the stream processing, snapshot the RDD and
  continue
 
  Can you please offer some insights?
 
 
  Thanks,
  Rohit Pujari
  Solutions Engineer, Hortonworks
  rpuj...@hortonworks.com
  716-430-6899
 
  CONFIDENTIALITY NOTICE
  NOTICE: This message is intended for the use of the individual or entity
 to
  which it is addressed and may contain information that is confidential,
  privileged and exempt from disclosure under applicable law. If the
 reader of
  this message is not the intended recipient, you are hereby notified that
 any
  printing, copying, dissemination, distribution, disclosure or forwarding
 of
  this communication is strictly prohibited. If you have received this
  communication in error, please contact the sender immediately and delete
 it
  from your system. Thank You.



Fwd: Trying to make sense of the actual executed code

2014-08-06 Thread Tobias Pfeiffer
(Forgot to include the mailing list in my reply. Here it is.)


Hi,

On Thu, Aug 7, 2014 at 7:55 AM, Tom thubregt...@gmail.com wrote:

 When I look at the output, I see that there are several stages, and several
 tasks per stage. The tasks have a TID, I do not see such a thing for a
 stage.


They should have. In my logs, for example, I see something like

INFO  scheduler.DAGScheduler - Submitting Stage 1 (MapPartitionsRDD[4] at
reduceByKey at SimpleSpark.scala:21), which has no missing parents
INFO  scheduler.DAGScheduler - Submitting Stage 0 (MapPartitionsRDD[6] at
reduceByKey at SimpleSpark.scala:21), which is now runnable


 But what I really want to know is the following:
 Which map, shuffle and reduces are performed in which order/where can I see
 the actual executed code per task/stage. In between files/rdd's would be a
 bonus!


I would also be interested in that, although I think it's quite hard to
understand what is actually being executed. I dug a bit into that
yesterday, and even the simple WordCount (flatMap, map, reduceByKey, max)
is already quite tough to understand. For example, reduceByKey consists of
three transformations (local reduceByKey, repartition by key, another local
reduceByKey), one of which happens in one stage, the other two in a
different stage. I would love to see a good visualization of that (I wonder
how the developers got their head around that without such a tool), but I
am not aware of any.

Tobias


Re: [spark-streaming] kafka source and flow control

2014-08-11 Thread Tobias Pfeiffer
Hi,

On Mon, Aug 11, 2014 at 9:41 PM, Gwenhael Pasquiers 
gwenhael.pasqui...@ericsson.com wrote:

 We intend to apply other operations on the data later in the same spark
 context, but our first step is to archive it.



 Our goal is somth like this

 Step 1 : consume kafka
 Step 2 : archive to hdfs AND send to step 3
 Step 3 : transform data

 Step 4 : save transformed data to HDFS as input for M/R


I see. Well I think Spark Streaming may be well suited for that purpose.


 To us it looks like a great flaw if, in streaming mode, spark-streaming
 cannot slow down it’s consumption depending on the available resources.


On Mon, Aug 11, 2014 at 10:10 PM, Gwenhael Pasquiers 
gwenhael.pasqui...@ericsson.com wrote:

 I think the kind of self-regulating system you describe would be too
 difficult to implement and probably unreliable (even more with the fact
 that we have multiple slaves).


Isn't slow down its consumption depending on the available resources a
self-regulating system? I don't see how you can adapt to available
resources without measuring your execution time and then change how much
you consume. Did you have any particular form of adaption in mind?

Tobias


Fwd: Task closures and synchronization

2014-08-12 Thread Tobias Pfeiffer
Uh, for some reason I don't seem to automatically reply to the list any
more.
Here is again my message to Tom.

-- Forwarded message --

Tom,

On Wed, Aug 13, 2014 at 5:35 AM, Tom Vacek minnesota...@gmail.com wrote:

 This is a back-to-basics question.  How do we know when Spark will clone
 an object and distribute it with task closures versus synchronize access to
 it.

 For example, the old rookie mistake of random number generation:

 import scala.util.Random
 val randRDD = sc.parallelize(0 until 1000).map(ii = Random.nextGaussian)

 One can check to see that each partition contains a different set of
 random numbers, so the RNG obviously was not cloned, but access was
 synchronized.


In this case, Random is a singleton object; Random.nextGaussian is like a
static method of a Java class. The access is not synchronized (unless I
misunderstand synchronized), but each Spark worker will use a JVM-local
instance of the Random object. You don't actually close over the Random
object in this case. In fact, this is one way to have node-local state
(e.g., for DB connection pooling).


 However:

 val myMap = collection.mutable.Map.empty[Int,Int]
 sc.parallelize(0 until 100).mapPartitions(it = {it.foreach(ii = myMap(ii) = 
 ii); Array(myMap).iterator}).collect


 This shows that each partition got a copy of the empty map and filled it
 in with its portion of the rdd.


In this case, myMap is an instance of the Map class, so it will be
serialized and shipped around. In fact, if you did `val Random = new
scala.util.Random()` in your code above, then this object would also be
serialized and treated just as myMap. (NB. No, it is not. Spark hangs for
me when I do this and doesn't return anything...)

Tobias


Re: Spark Streaming example on your mesos cluster

2014-08-12 Thread Tobias Pfeiffer
Hi,

On Wed, Aug 13, 2014 at 4:24 AM, Zia Syed xia.s...@gmail.com wrote:

 I dont particularly see any errors on my logs, either on console, or on
 slaves. I see slave downloads the  spark-1.0.2-bin-hadoop1.tgz file and
 unpacks them as well. Mesos Master shows quiet alot of Tasks created and
 Finished.  I dont see any output on my console of the Word Counts, like
 in get in the Spark version.

 Any suggestions/ideas how i can make it work?


You have to check the logs on the Mesos slaves in
 /tmp/mesos/slaves/***/frameworks/ -- I guess that you are missing the jar
that your application is packed in.

Tobias


Re: spark streaming : what is the best way to make a driver highly available

2014-08-13 Thread Tobias Pfeiffer
Hi,

On Thu, Aug 14, 2014 at 5:49 AM, salemi alireza.sal...@udo.edu wrote:

 what is the best way to make a spark streaming driver highly available.


I would also be interested in that. In particular for Streaming
applications where the Spark driver is running for a long time, this might
be important, I think.

Thanks
Tobias


Re: [Spar Streaming] How can we use consecutive data points as the features ?

2014-08-17 Thread Tobias Pfeiffer
Hi,

On Sat, Aug 16, 2014 at 3:29 AM, Yan Fang yanfang...@gmail.com wrote:

 If all consecutive data points are in one batch, it's not complicated
 except that the order of data points is not guaranteed in the batch and so
 I have to use the timestamp in the data point to reach my goal. However,
 when the consecutive data points spread in two or more batches, how can I
 do this?


You *could* use window operations. If there is an upper limit to how many
batches you might want to look at, you can instead consider a window that
is large enough and thereby avoid using updateStateByKey.

Tobias


Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Tobias Pfeiffer
Hi Wei,

On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu wei@stellarloyalty.com
wrote:

 Since our application cannot tolerate losing customer data, I am wondering
 what is the best way for us to address this issue.
 1) We are thinking writing application specific logic to address the data
 loss. To us, the problem seems to be caused by that Kinesis receivers
 advanced their checkpoint before we know for sure the data is replicated.
 For example, we can do another checkpoint ourselves to remember the kinesis
 sequence number for data that has been processed by spark streaming. When
 Kinesis receiver is restarted due to worker failures, we restarted it from
 the checkpoint we tracked.


This sounds pretty much to me like the way Kafka does it. So, I am not
saying that the stock KafkaReceiver does what you want (it may or may not),
but it should be possible to update the offset (corresponds to sequence
number) in Zookeeper only after data has been replicated successfully. I
guess replace Kinesis by Kafka is not in option for you, but you may
consider pulling Kinesis data into Kafka before processing with Spark?

Tobias


Re: type issue: found RDD[T] expected RDD[A]

2014-08-19 Thread Tobias Pfeiffer
Hi,

On Tue, Aug 19, 2014 at 7:01 PM, Patrick McGloin mcgloin.patr...@gmail.com
wrote:

 I think the type of the data contained in your RDD needs to be a known
 case class and not abstract for createSchemaRDD.  This makes sense when
 you think it needs to know about the fields in the object to create the
 schema.


Exactly this. The actual message pointing to that is:

inferred type arguments [T] do not conform to method createSchemaRDD's
type parameter bounds [A : Product]

All case classes are automatically subclasses of Product, but otherwise you
will have to extend Product and add the required methods yourself.

Tobias


Re: Trying to run SparkSQL over Spark Streaming

2014-08-20 Thread Tobias Pfeiffer
Hi,


On Thu, Aug 21, 2014 at 2:19 PM, praveshjain1991 praveshjain1...@gmail.com
wrote:

 Using Spark SQL with batch data works fine so I'm thinking it has to do
 with
 how I'm calling streamingcontext.start(). Any ideas what is the issue? Here
 is the code:


Please have a look at


http://apache-spark-user-list.1001560.n3.nabble.com/Some-question-about-SQL-and-streaming-tp9229p9254.html

If you want to issue an SQL statement on streaming data, you must have both
the registerAsTable() and the sql() call *within* the foreachRDD(...)
block, or -- as you experienced -- the table name will be unknown.

Tobias


Re: Trying to run SparkSQL over Spark Streaming

2014-08-21 Thread Tobias Pfeiffer
Hi,


On Thu, Aug 21, 2014 at 3:11 PM, praveshjain1991 praveshjain1...@gmail.com
wrote:

 The part that you mentioned */the variable `result ` is of type
 DStream[Row]. That is, the meta-information from the SchemaRDD is lost and,
 from what I understand, there is then no way to learn about the column
 names
 of the returned data, as this information is only encoded in the
 SchemaRDD/*


Because a DStream[Row] is basically like DStream[Array[Object]]. You lose
all the information about data types in your result and there is no way to
recover it, once the schema is inaccessible. If you want to process the
data later on, you will have to check types and make assertions about the
statements that were issued before etc.

Tobias


Re: Spark Stream + HDFS Append

2014-08-24 Thread Tobias Pfeiffer
Hi,


On Mon, Aug 25, 2014 at 9:56 AM, Dean Chen deanch...@gmail.com wrote:

  We are using HDFS for log storage where logs are flushed to HDFS every
 minute, with a new file created for each hour. We would like to consume
 these logs using spark streaming.

 The docs state that new HDFS will be picked up, but does Spark Streaming
 support HDFS appends?


I don't think so. The docs at
http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.streaming.StreamingContext
say that even for new files, Files must be written to the monitored
directory by 'moving' them from another location within the same file
system. So I don't think you can just append to your files.

Tobias


Re: multiple windows from the same DStream ?

2014-08-24 Thread Tobias Pfeiffer
Hi,

computations are triggered by an output operation. No output operation, no
computation. Therefore in your code example,

On Thu, Aug 21, 2014 at 11:58 PM, Josh J joshjd...@gmail.com wrote:

 JavaPairReceiverInputDStreamString, String messages =
 KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

 Duration windowLength = new Duration(3);
 Duration slideInterval = new Duration(3);
 JavaPairDStreamString,String windowMessages1 =
 messages.window(windowLength,slideInterval);
 JavaPairDStreamString,String windowMessages2 =
 messages.window(windowLength,slideInterval);


nothing would actually happen. However, if you add output operations, you
can use the same window multiple times (in which case caching the data
might make sense). So if your windowLength and slideInterval are the same,
then there would be no point in having two of them, you could just say:

  windowMessages1.saveAsHadoopFiles(...)  // output operation 1
  windowMessages1.print()  // output operation 2
  windowMessages1.map(someOtherFancyOperation).print()  // output operation
3 after processing

By default, these output operations are processed one after another. There
is a undocumented parameter spark.streaming.concurrentJobs (cf. 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-questions-td1494.html)
that allows to run output operations in parallel. I haven't used it, though.

Tobias


Re: Trying to run SparkSQL over Spark Streaming

2014-08-25 Thread Tobias Pfeiffer
Hi,


On Mon, Aug 25, 2014 at 7:11 PM, praveshjain1991 praveshjain1...@gmail.com
wrote:

 If you want to issue an SQL statement on streaming data, you must have
 both
 the registerAsTable() and the sql() call *within* the foreachRDD(...)
 block,
 or -- as you experienced -- the table name will be unknown

 Since this is the case then is there any way to run join over data received
 from two different streams?


Couldn't you do dstream1.join(dstream2).foreachRDD(...)?

Tobias


Re: Trying to run SparkSQL over Spark Streaming

2014-08-25 Thread Tobias Pfeiffer
Hi again,

On Tue, Aug 26, 2014 at 10:13 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 On Mon, Aug 25, 2014 at 7:11 PM, praveshjain1991 
 praveshjain1...@gmail.com wrote:

  If you want to issue an SQL statement on streaming data, you must have
 both
 the registerAsTable() and the sql() call *within* the foreachRDD(...)
 block,
 or -- as you experienced -- the table name will be unknown

 Since this is the case then is there any way to run join over data
 received
 from two different streams?


 Couldn't you do dstream1.join(dstream2).foreachRDD(...)?


 Ah, I guess you meant something like SELECT * FROM dstream1 JOIN dstream2
WHERE ...? I don't know if that is possible. Doesn't seem easy to me; I
don't think that's doable with the current codebase...

Tobias


Re: Spark Streaming - how to implement multiple calculation using the same data set

2014-09-02 Thread Tobias Pfeiffer
Hi,

On Wed, Sep 3, 2014 at 6:54 AM, salemi alireza.sal...@udo.edu wrote:

 I was able to calculate the individual measures separately and know I have
 to merge them and spark streaming doesn't support outer join yet.


Can't you assign some dummy key (e.g., index) before your processing and
then join on that key using a function from
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions
?

Tobias


Re: RDDs

2014-09-03 Thread Tobias Pfeiffer
Hello,


On Wed, Sep 3, 2014 at 6:02 PM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Can someone tell me what kind of operations can be performed on a
 replicated rdd?? What are the use-cases of a replicated rdd.


I suggest you read

https://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds
as an introduction, it lists a lot of the transformations and output
operations you can use.
Personally, I also found it quite helpful to read the paper about RDDs:
  http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf


 One basic doubt that is bothering me from long time: what is the
 difference between an application and job in the Spark parlance. I am
 confused b'cas of Hadoop jargon.


OK, someone else might answer that. I am myself confused with application,
job, task, stage etc. ;-)

Tobias


Multi-tenancy for Spark (Streaming) Applications

2014-09-03 Thread Tobias Pfeiffer
Hi,

I am not sure if multi-tenancy is the right word, but I am thinking about
a Spark application where multiple users can, say, log into some web
interface and specify a data processing pipeline with streaming source,
processing steps, and output.

Now as far as I know, there can be only one StreamingContext per JVM and
also I cannot add sources or processing steps once it has been started. Are
there any ideas/suggestinos for how to achieve a dynamic adding and
removing of input sources and processing pipelines? Do I need a separate
'java' process per user?
Also, can I realize such a thing when using YARN for dynamic allocation?

Thanks
Tobias


Re: advice on spark input development - python or scala?

2014-09-04 Thread Tobias Pfeiffer
Hi,

On Thu, Sep 4, 2014 at 11:49 PM, Johnny Kelsey jkkel...@semblent.com
wrote:

 As a concrete example, we have a python class (part of a fairly large
 class library) which, as part of its constructor, also creates a record of
 itself in the cassandra key space. So we get an initialised class  a row
 in a table on the cluster. My problem is this: should we even be doing this?


I think the problem you describe is not related to any programming
language. This is a design decision and/or good/bad programming, but it has
nothing to do with Python or Scala, if I am not mistaken.

Personally, I am a big fan of Scala because it's concise and provides me
with type checking at compile time. However, Scala might be harder to learn
than Python (in particular if you are already using Python) and while
execution of Scala code may be faster, the compiler is a quite heavy (in
terms of hardware requirements) and compile time is a bit lengthy, I'd say.

Tobias


Re: Recursion

2014-09-07 Thread Tobias Pfeiffer
Hi,

On Fri, Sep 5, 2014 at 6:16 PM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Does Spark support recursive calls?


Can you give an example of which kind of recursion you would like to use?

Tobias


Re: How to list all registered tables in a sql context?

2014-09-07 Thread Tobias Pfeiffer
Hi,

On Sat, Sep 6, 2014 at 1:40 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Err... there's no such feature?


The problem is that the SQLContext's `catalog` member is protected, so you
can't access it from outside. If you subclass SQLContext, and make sure
that `catalog` is always a `SimpleCatalog`, you can check `catalog.tables`
(which is a HashMap).

Tobias


Re: Spark Streaming and database access (e.g. MySQL)

2014-09-08 Thread Tobias Pfeiffer
Hi,

On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen so...@cloudera.com wrote:

  if (rdd.take (1).size == 1) {
  rdd foreachPartition { iterator =


I was wondering: Since take() is an output operation, isn't it computed
twice (once for the take(1), once during the iteration)? Or will only one
single element be computed for take(1)?

Thanks
Tobias


Re: Multi-tenancy for Spark (Streaming) Applications

2014-09-08 Thread Tobias Pfeiffer
Hi,

On Thu, Sep 4, 2014 at 10:33 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 In the current state of Spark Streaming, creating separate Java processes
 each having a streaming context is probably the best approach to
 dynamically adding and removing of input sources. All of these should be
 able to to use a YARN cluster for resource allocation.


So, for example, I would write a server application that accepts a command
like createNewInstance and then calls spark-submit, pushing my actual
application to the YARN cluster? Or could I use spark-jobserver?

Thanks
Tobias


Re: Spark streaming for synchronous API

2014-09-08 Thread Tobias Pfeiffer
Ron,

On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! zlgonza...@yahoo.com.invalid
 wrote:

   I’m trying to figure out how I can run Spark Streaming like an API.
   The goal is to have a synchronous REST API that runs the spark data flow
 on YARN.


I guess I *may* develop something similar in the future.

By a synchronous REST API, do you mean that submitting the job is
synchronous and you would fetch the processing results via a different
call? Or do you want to submit a job and get the processed data back as an
HTTP stream?

To begin with, is it even possible to have Spark Streaming run as a yarn
 job?


I think it is very much possible to run Spark Streaming as a YARN job; at
least it worked well with Mesos.

Tobias


Re: Spark streaming for synchronous API

2014-09-08 Thread Tobias Pfeiffer
Hi,

On Tue, Sep 9, 2014 at 12:59 PM, Ron's Yahoo! zlgonza...@yahoo.com wrote:

  I want to create a synchronous REST API that will process some data that
 is passed in as some request.
  I would imagine that the Spark Streaming Job on YARN is a long
 running job that waits on requests from something. What that something is
 is still not clear to me, but I would imagine that it’s some queue.
 The goal is to be able to push a message onto a queue with some id, and
 then  get the processed results back from Spark Streaming.


That is not exactly a Spark Streaming use case, I think. Spark Streaming
pulls data from some source (like a queue), then processes all data
collected in a certain interval in a mini-batch, and stores that data
somewhere. It is not well suited for handling request-response cycles in a
synchronous way; you might consider using plain Spark (without Streaming)
for that.

For example, you could use the unfiltered
http://unfiltered.databinder.net/Unfiltered.html library and within request
handling do some RDD operation, returning the output as HTTP response. This
works fine as multiple threads can submit Spark jobs concurrently
https://spark.apache.org/docs/latest/job-scheduling.html You could also
check https://github.com/adobe-research/spindle -- that seems to be similar
to what you are doing.

 The goal is for the REST API be able to respond to lots of calls with low
 latency.
  Hope that clarifies things...


Note that low latency for lots of calls is maybe not something that
Spark was built for. Even if you do close to nothing data processing, you
may not get below 200ms or so due to the overhead of submitting jobs etc.,
from my experience.

Tobias


Re: Spark streaming for synchronous API

2014-09-08 Thread Tobias Pfeiffer
Hi,

On Tue, Sep 9, 2014 at 2:02 PM, Ron's Yahoo! zlgonza...@yahoo.com wrote:

   So I guess where I was coming from was the assumption that starting up a
 new job to be listening on a particular queue topic could be done
 asynchronously.


No, with the current state of Spark Streaming, all data sources and the
processing pipeline must be fixed when you start your StreamingContext. You
cannot add new data sources dynamically at the moment, see
http://apache-spark-user-list.1001560.n3.nabble.com/Multi-tenancy-for-Spark-Streaming-Applications-td13398.html


   For example, let’s say there’s a particular topic T1 in a Kafka queue.
 If I have a new set of requests coming from a particular client A, I was
 wondering if I could create a partition A.
   The streaming job is submitted to listen to T1.A and will write to a
 topic T2.A, which the REST endpoint would be listening on.


That doesn't seem like a good way to use Kafka. It may be possible, but I
am pretty sure you should create a new topic T_A instead of a partition A
in an existing topic. With some modifications of Spark Streaming's
KafkaReceiver you *might* be able to get it to work as you imagine, but it
was not meant to be that way, I think.

Also, you will not get low latency, because Spark Streaming processes
data in batches of fixed interval length (say, 1 second) and in the worst
case your query will wait up to 1 second before processing even starts.

If I understand correctly what you are trying to do (which I am not sure
about), I would probably recommend to choose a bit of a different
architecture; in particular given that you cannot dynamically add data
sources.

Tobias


Re: Multi-tenancy for Spark (Streaming) Applications

2014-09-11 Thread Tobias Pfeiffer
Hi,

by now I understood maybe a bit better how spark-submit and YARN play
together and how Spark driver and slaves play together on YARN.

Now for my usecase, as described on 
https://spark.apache.org/docs/latest/submitting-applications.html, I would
probably have a end-user-facing gateway that submits my Spark (Streaming)
application to the YARN cluster in yarn-cluster mode.

I have a couple of questions regarding that setup:
* That gateway does not need to be written in Scala or Java, it actually
has no contact with the Spark libraries; it is just executing a program on
the command line (./spark-submit ...), right?
* Since my application is a streaming application, it won't finish by
itself. What is the best way to terminate the application on the cluster
from my gateway program? Can I just send SIGTERM to the spark-submit
program? Is it recommended?
* I guess there are many possibilities to achieve that, but what is a good
way to send commands/instructions to the running Spark application? If I
want to push some commands from the gateway to the Spark driver, I guess I
need to get its IP address - how? If I want the Spark driver to pull its
instructions, what is a good way to do so? Any suggestions?

Thanks,
Tobias


Re: Announcing Spark 1.1.0!

2014-09-11 Thread Tobias Pfeiffer
Hi,

On Fri, Sep 12, 2014 at 9:12 AM, Patrick Wendell pwend...@gmail.com wrote:

 I am happy to announce the availability of Spark 1.1.0! Spark 1.1.0 is
 the second release on the API-compatible 1.X line. It is Spark's
 largest release ever, with contributions from 171 developers!


Great, congratulations!! The release notes read great!
Seems like if I wait long enough for new Spark releases, my applications
will build themselves in the end ;-)

Tobias


spark-submit: fire-and-forget mode?

2014-09-18 Thread Tobias Pfeiffer
Hi,

I am wondering: Is it possible to run spark-submit in a mode where it will
start an application on a YARN cluster (i.e., driver and executors run on
the cluster) and then forget about it in the sense that the Spark
application is completely independent from the host that ran the
spark-submit command and will not be affected if that controlling machine
shuts down etc.? I was using spark-submit with YARN in cluster mode, but
spark-submit stayed in the foreground and as far as I understood, it
terminated the application on the cluster when spark-submit was Ctrl+C'ed.

Thanks
Tobias


Re: spark-submit: fire-and-forget mode?

2014-09-18 Thread Tobias Pfeiffer
Hi,

thanks for everyone's replies!

 On Thu, Sep 18, 2014 at 7:37 AM, Sandy Ryza sandy.r...@cloudera.com
wrote:
 YARN cluster mode should have the behavior you're looking for.  The
client
 process will stick around to report on things, but should be able to be
 killed without affecting the application.  If this isn't the behavior
you're
 observing, and your application isn't failing for a different reason,
 there's a bug.

Sandy, yes, you are right; I must have mis-interpreted some
results/behavior when I was trying this before.

On Thu, Sep 18, 2014 at 1:19 PM, Andrew Or and...@databricks.com wrote:

 Thanks Tobias, I have filed a JIRA for it.


Great, thanks for opening the issue! I think that's a very useful thing to
have.

Tobias


Re: diamond dependency tree

2014-09-18 Thread Tobias Pfeiffer
Hi,

On Thu, Sep 18, 2014 at 8:55 PM, Victor Tso-Guillen v...@paxata.com wrote:

 Is it possible to express a diamond DAG and have the leaf dependency
 evaluate only once?


Well, strictly speaking your graph is not a tree, and also the meaning of
leaf is not totally clear, I'd say.


 So say data flows left to right (and the dependencies are oriented right
 to left):

 [image: Inline image 1]
 Is it possible to run d.collect() and have a evaluate its iterator only
 once?


If you say a.cache() (or a.persist()) then it will be evaluated only once
and then the cached data will be used for later accesses.

Tobias


Re: rsync problem

2014-09-19 Thread Tobias Pfeiffer
Hi,

On Fri, Sep 19, 2014 at 5:02 PM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 This worked perfectly. But, I wanted to simultaneously rsync all the
 slaves. So, added the other slaves as following:

 rsync -avL --progress path/to/spark-1.0.0  username@destinationhostname
 :path/to/destdirectory username@slave2:path username@slave3:path and so
 on.


The rsync man page says
   rsync [OPTION...] SRC... [USER@]HOST:DEST
so as I understand your command, you have copied a lot of files from
various hosts to username@slave3:path. I don't think rsync can copy to
various locations at once.

Tobias


Re: how long does it take executing ./sbt/sbt assembly

2014-09-23 Thread Tobias Pfeiffer
Hi,

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-building-project-with-sbt-assembly-is-extremely-slow-td13152.html
 -- Maybe related to this?

Tobias


All-time stream re-processing

2014-09-24 Thread Tobias Pfeiffer
Hi,

I have a setup (in mind) where data is written to Kafka and this data is
persisted in HDFS (e.g., using camus) so that I have an all-time archive of
all stream data ever received. Now I want to process that all-time archive
and when I am done with that, continue with the live stream, using Spark
Streaming. (In a perfect world, Kafka would have infinite storage and I
would always use the Kafka receiver, starting from offset 0.)
Does anyone have an idea how to realize such a setup? Would I write a
custom receiver that first reads the HDFS file and then connects to Kafka?
Is there an existing solution for that use case?

Thanks
Tobias


Re: All-time stream re-processing

2014-09-24 Thread Tobias Pfeiffer
Hi,

On Wed, Sep 24, 2014 at 7:23 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 So you have a single Kafka topic which has very high retention period (
 that decides the storage capacity of a given Kafka topic) and you want to
 process all historical data first using Camus and then start the streaming
 process ?


I don't necessarily want to process the historical data using Camus, but
I want to keep it forever (longer than Kafka's retention period) and
process the stored data and the stream. (I don't really care about how the
data got into HDFS, be it Camus or something else, but I assume that Kafka
can't store it forever.)

Imagine that I receive all tweets posted to Twitter, they go into my
Kafka instance and are archived to HDFS. Now a user logs in and I want to
display to that user a) all posts that have ever mentioned him/her and b)
continue to update that list from the current stream. (In that order.) This
happens for a number of users, so it's a process that needs to be
repeatable with different Spark operations.

The challenge is, Camus and Spark are two different consumer for Kafka
 topic and both maintains their own consumed offset different way. Camus
 stores offset in HDFS, and Spark Consumer in ZK. What I understand, you
 need something which identify till which point Camus pulled ( for a given
 partitions of topic) and want to start Spark receiver from there ?


I think I need such a thing. Also, I think Camus stores those offsets, so
in theory it should be possible to consume all HDFS files, read the offset,
then start Kafka processing from that offset. That sounds very lambda
architecture-ish to me, so I was wondering if someone has realized a
similar setup.

Thanks
Tobias


Re: rsync problem

2014-09-24 Thread Tobias Pfeiffer
Hi,

I assume you unintentionally did not reply to the list, so I'm adding it
back to CC.

How do you submit your job to the cluster?

Tobias


On Thu, Sep 25, 2014 at 2:21 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 How do I find out whether a node in the cluster is a master or slave??
 Till now I was thinking that slaves file under the conf folder makes the
 difference. Also, the MASTER_MASTER_IP in the spark-env.sh file.

 what else differentiates a slave from the master??

 On Wed, Sep 24, 2014 at 10:46 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 The job execution is taking place perfectly. Previously, all my print
 statements used to be stored in spark/work/*/stdout file. But, now after
 doing the rsync, I find that none of the prtint statements are getting
 reflected in the stdout file under work folder. But, when I go to the code,
 I find the statements in the code. But, they are not reflected into the
 stdout file as before.

 Can you please tell me where I went wrong.  All I want is to see my
 mofication in the code getting relected in output
 .

 On Wed, Sep 24, 2014 at 10:22 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Hi,

 I have a very important and fundamental doubt: I have rsynced the entire
 spark folder from the master to all slaves in the cluster. When I execute a
 job, its working perfectly. But, when I rsync the entire spark folder of
 the master to all the slaves, is it not that I am sending the  master
 configurations to all the slaves and making the slaves behave like master??

 First of all, is it correct to rsync the entire spark folder??
 But, if I change only one file, then how do I rsync it to all??

 On Fri, Sep 19, 2014 at 8:44 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Thank you Soumya Simantha and Tobias. I've deleted the contents of the
 work folder in all the nodes.
 Now its working perfectly as it was before.

 Thank you
 Karthik

 On Fri, Sep 19, 2014 at 4:46 PM, Soumya Simanta 
 soumya.sima...@gmail.com wrote:

 One possible reason is maybe that the checkpointing directory
 $SPARK_HOME/work is rsynced as well.
 Try emptying the contents of the work folder on each node and try
 again.



 On Fri, Sep 19, 2014 at 4:53 AM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 I
 * followed this command:rsync -avL --progress path/to/spark-1.0.0
 username@destinationhostname:*


 *path/to/destdirectory. Anyway, for now, I did it individually for
 each node.*

 I have copied to each node at a time individually using the above
 command. So, I guess the copying may not contain any mixture of files.
 Also, as of now, I am not facing any MethodNotFound exceptions. But, 
 there
 is no job execution taking place.

 After sometime, one by one, each goes down and the cluster shuts
 down.

 On Fri, Sep 19, 2014 at 2:15 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Fri, Sep 19, 2014 at 5:17 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

  ,

 * you have copied a lot of files from various hosts to
 username@slave3:path*
 only from one node to all the other nodes...


 I don't think rsync can do that in one command as you described. My
 guess is that now you have a wild mixture of jar files all across your
 cluster which will lead to fancy exceptions like MethodNotFound etc.,
 that's maybe why your cluster is not working correctly.

 Tobias











Re: Using GraphX with Spark Streaming?

2014-10-05 Thread Tobias Pfeiffer
Arko,

On Sat, Oct 4, 2014 at 1:40 AM, Arko Provo Mukherjee 
arkoprovomukher...@gmail.com wrote:

 Apologies if this is a stupid question but I am trying to understand
 why this can or cannot be done. As far as I understand that streaming
 algorithms need to be different from batch algorithms as the streaming
 algorithms are generally incremental. Hence the question whether the
 RDD transformations can be extended to streaming or not.


I don't think that streaming algorithms are generally incremental in
Spark Streaming. In fact, data is collected and every N seconds
(minutes/...), the data collected during that interval is batch-processed
as with normal batch operations. In fact, using data previously obtained
from the stream (in previous intervals) is a bit more complicated than
plain batch processing. If the graph you want to create only uses data from
one interval/batch, that should be dead simple. You might want to have a
look at
https://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams

Tobias


Re: Record-at-a-time model for Spark Streaming

2014-10-07 Thread Tobias Pfeiffer
Jianneng,

On Wed, Oct 8, 2014 at 8:44 AM, Jianneng Li jiannen...@berkeley.edu wrote:

 I understand that Spark Streaming uses micro-batches to implement
 streaming, while traditional streaming systems use the record-at-a-time
 processing model. The performance benefit of the former is throughput, and
 the latter is latency. I'm wondering what it would take to implement
 record-at-a-time for Spark Streaming? Would it be something that is
 feasible to prototype in one or two months?


I think this is so much against the fundamental design concept of Spark
Streaming that there would be nothing left of Spark Streaming when you are
done with it. Spark is fundamentally based on the idea of an RDD, that is,
distributed storage of data, and Spark Streaming basically a wrapper that
stores incoming data as an RDD and then processes it as a batch. One item
at a time does not match this model. Even if you *were* able to prototype
something, I think performance would be abysmal.

Tobias


Re: dynamic sliding window duration

2014-10-07 Thread Tobias Pfeiffer
Hi,

On Wed, Oct 8, 2014 at 4:50 AM, Josh J joshjd...@gmail.com wrote:

 I have a source which fluctuates in the frequency of streaming tuples. I
 would like to process certain batch counts, rather than batch window
 durations. Is it possible to either

 1) define batch window sizes


Cf.
http://apache-spark-user-list.1001560.n3.nabble.com/window-every-n-elements-instead-of-time-based-td2085.html


 2) dynamically adjust the duration of the sliding window?


That's not possible AFAIK, because you can't change anything in the
processing pipeline after StreamingContext has been started.

Tobias


Combined HDFS/Kafka Processing

2014-10-09 Thread Tobias Pfeiffer
Hi,

I have a setting where data arrives in Kafka and is stored to HDFS from
there (maybe using Camus or Flume). I want to write a Spark Streaming app
where
 - first all files in a that HDFS directory are processed,
 - and then the stream from Kafka is processed, starting
   with the first item that was not yet in HDFS.
The order of the data is somehow important, so I should really *first* do
the HDFS processing (which might take a while, by the way) and *then* start
stream processing.

Does anyone have any suggestions on how to implement this? Should I write a
custom receiver, a custom input stream, can I just use built-in mechanisms?

I would be happy to learn about any ideas.

Thanks
Tobias


Re: Processing order in Spark

2014-10-13 Thread Tobias Pfeiffer
Sean,

thanks, I didn't know about repartitionAndSortWithinPartitions, that seems
very helpful!

Tobias


spark-submit results in NoClassDefFoundError

2014-10-29 Thread Tobias Pfeiffer
Hi,

I am trying to get my Spark application to run on YARN and by now I have
managed to build a fat jar as described on 
http://markmail.org/message/c6no2nyaqjdujnkq (which is the only really
usable  manual on how to get such a jar file). My code runs fine using sbt
test and sbt run, but when running

~/spark-1.1.0-bin-hadoop2.4/bin/spark-submit \
  --class my.spark.MyClass --master local[3] \
  target/scala-2.10/myclass-assembly-1.0.jar

I get:

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Exception in thread main java.lang.NoClassDefFoundError:
com/typesafe/scalalogging/slf4j/Logger
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
at java.lang.Class.getMethod0(Class.java:2856)
at java.lang.Class.getMethod(Class.java:1668)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:325)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
com.typesafe.scalalogging.slf4j.Logger
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 7 more
ABRT problem creation: 'success'

It seems to run into an error before it does anything with my jar?

I am using
  com.typesafe.scala-logging %% scala-logging-slf4j% 2.1.2
instead of
  com.typesafe %% scalalogging-slf4j% 1.1.0
in my SBT file, could that be a reason?

Thanks
Tobias


Re: spark-submit results in NoClassDefFoundError

2014-10-29 Thread Tobias Pfeiffer
Hi again,

On Thu, Oct 30, 2014 at 11:50 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Exception in thread main java.lang.NoClassDefFoundError:
 com/typesafe/scalalogging/slf4j/Logger


It turned out scalalogging was not included in the fat jar due to 
https://github.com/sbt/sbt-assembly/issues/116.


 I am using
   com.typesafe.scala-logging %% scala-logging-slf4j% 2.1.2
 instead of
   com.typesafe %% scalalogging-slf4j% 1.1.0
 in my SBT file, could that be a reason?


So yes, that was the reason, in a way... however, I decided to include
scala in the fat jar instead of modifying all my logging code...

Tobias


Re: NonSerializable Exception in foreachRDD

2014-10-30 Thread Tobias Pfeiffer
Harold,

just mentioning it in case you run into it: If you are in a separate
thread, there are apparently stricter limits to what you can and cannot
serialize:

val someVal
future {
  // be very careful with defining RDD operations using someVal here
  val myLocalVal = someVal
  // use myLocalVal instead
}

On Thu, Oct 30, 2014 at 4:55 PM, Harold Nguyen har...@nexgate.com wrote:

 In Spark Streaming, when I do foreachRDD on my DStreams, I get a
 NonSerializable exception when I try to do something like:

 DStream.foreachRDD( rdd = {
   var sc.parallelize(Seq((test, blah)))
 })


Is this the code you are actually using? var sc.parallelize(...) doesn't
really look like valid Scala to me.

Tobias


Re: hadoop_conf_dir when running spark on yarn

2014-11-03 Thread Tobias Pfeiffer
Hi,

On Mon, Nov 3, 2014 at 1:29 PM, Amey Chaugule ambr...@gmail.com wrote:

 I thought that only applied when you're trying to run a job using
 spark-submit or in the shell...


And how are you starting your Yarn job, if not via spark-submit?

Tobias


Re: different behaviour of the same code

2014-11-03 Thread Tobias Pfeiffer
Hi,

On Fri, Oct 31, 2014 at 4:31 PM, lieyan lie...@yahoo.com wrote:

 The code are here:  LogReg.scala
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n17803/LogReg.scala
 

 Then I click the Run button of the IDEA, and I get the following error
 message
 errlog.txt
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n17803/errlog.txt
 
 .
 But when I export the jar file, and use *spark-submit --class
 net.yanl.spark.LogReg log_reg.jar 15*. The program works finely.


I have not used the spark built-in cluster manager and I don't know how
application jar distribution is done in it. However, it seems to me that
when you use spark-submit, then spark-submit takes care of distributing
your jar file properly to all the cluster nodes, that's why it works fine.
When you run it from your IDE, it seems not to do that, that's why some
classes are not there on all cluster nodes and you run
into ClassNotFoundExceptions. If you change the master to local[3]
instead of spark://master.local:7077 and run it from IDEA, does it work?

Tobias


netty on classpath when using spark-submit

2014-11-03 Thread Tobias Pfeiffer
Hi,

I tried hard to get a version of netty into my jar file created with sbt
assembly that works with all my libraries. Now I managed that and was
really happy, but it seems like spark-submit puts an older version of netty
on the classpath when submitting to a cluster, such that my code ends up
with an NoSuchMethodError:

Code:
  val a = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
http://localhost;)
  val f = new File(a.getClass.getProtectionDomain().
getCodeSource().getLocation().getPath())
  println(f.getAbsolutePath)
  println(headers:  + a.headers())

When executed with sbt run:
  ~/.ivy2/cache/io.netty/netty/bundles/netty-3.9.4.Final.jar
  headers: org.jboss.netty.handler.codec.http.DefaultHttpHeaders@64934069

When executed with spark-submit:
  ~/spark-1.1.0-bin-hadoop2.4/lib/spark-assembly-1.1.0-hadoop2.4.0.jar
  Exception in thread main java.lang.NoSuchMethodError:
org.jboss.netty.handler.codec.http.DefaultHttpRequest.headers()Lorg/jboss/netty/handler/codec/http/HttpHeaders;
...

How can I get the old netty version off my classpath?

Thanks
Tobias


Re: netty on classpath when using spark-submit

2014-11-04 Thread Tobias Pfeiffer
Markus,

thanks for your help!

On Tue, Nov 4, 2014 at 8:33 PM, M. Dale medal...@yahoo.com.invalid wrote:

  Tobias,
From http://spark.apache.org/docs/latest/configuration.html it seems
 that there is an experimental property:

 spark.files.userClassPathFirst


Thank you very much, I didn't know about this.  Unfortunately, it doesn't
change anything.  With this setting both true and false (as indicated by
the Spark web interface) and no matter whether local[N] or yarn-client
or yarn-cluster mode are used with spark-submit, the classpath looks the
same and the netty class is loaded from the Spark jar. Can I use this
setting with spark-submit at all?

Thanks
Tobias


Re: netty on classpath when using spark-submit

2014-11-09 Thread Tobias Pfeiffer
Hi,

On Wed, Nov 5, 2014 at 10:23 AM, Tobias Pfeiffer wrote:

 On Tue, Nov 4, 2014 at 8:33 PM, M. Dale wrote:

From http://spark.apache.org/docs/latest/configuration.html it seems
 that there is an experimental property:

 spark.files.userClassPathFirst


 Thank you very much, I didn't know about this.  Unfortunately, it doesn't
 change anything.  With this setting both true and false (as indicated by
 the Spark web interface) and no matter whether local[N] or yarn-client
 or yarn-cluster mode are used with spark-submit, the classpath looks the
 same and the netty class is loaded from the Spark jar. Can I use this
 setting with spark-submit at all?


Has anyone used this setting successfully or can advice me on how to use it
correctly?

Thanks
Tobias


Re: convert ListString to dstream

2014-11-10 Thread Tobias Pfeiffer
Josh,

On Tue, Nov 11, 2014 at 7:43 AM, Josh J joshjd...@gmail.com wrote:

 I have some data generated by some utilities that returns the results as
 a ListString. I would like to join this with a Dstream of strings. How
 can I do this? I tried the following though get scala compiler errors

 val list_scalaconverted =
 ssc.sparkContext.parallelize(listvalues.toArray())


Your `listvalues` seems to be a java.util.List, not a
scala.collection.immutable.List, right? In that case, toArray() will return
a Array[Object], not an Array[String], which leads to the error you see.
Have a look at
http://www.scala-lang.org/api/current/index.html#scala.collection.JavaConversions$
and convert your Java list to a Scala list.

Tobias


Re: Mapping SchemaRDD/Row to JSON

2014-11-10 Thread Tobias Pfeiffer
Akshat

On Tue, Nov 11, 2014 at 4:12 AM, Akshat Aranya aara...@gmail.com wrote:

 Does there exist a way to serialize Row objects to JSON.


I can't think of any other way than the one you proposed.  A Row is more or
less an Array[Object], so you need to read JSON key and data type from the
schema.

Tobias


Re: Strange behavior of spark-shell while accessing hdfs

2014-11-11 Thread Tobias Pfeiffer
Hi,

On Tue, Nov 11, 2014 at 2:04 PM, hmxxyy hmx...@gmail.com wrote:

 If I run bin/spark-shell without connecting a master, it can access a hdfs
 file on a remote cluster with kerberos authentication.

[...]

However, if I start the master and slave on the same host and using
 bin/spark-shell --master spark://*.*.*.*:7077
 run the same commands

[... ]
 org.apache.hadoop.security.AccessControlException: Client cannot
 authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
 *.*.*.*.com/98.138.236.95; destination host is: *.*.*.*:8020;


When you give no master, it is local[*], so Spark will (implicitly?)
authenticate to HDFS from your local machine using local environment
variables, key files etc., I guess.

When you give a spark://* master, Spark will run on a different machine,
where you have not yet authenticated to HDFS, I think. I don't know how to
solve this, though, maybe some Kerberos token must be passed on to the
Spark cluster?

Tobias


Re: Best practice for multi-user web controller in front of Spark

2014-11-11 Thread Tobias Pfeiffer
Hi,

also there is Spindle https://github.com/adobe-research/spindle which was
introduced on this list some time ago. I haven't looked into it deeply, but
you might gain some valuable insights from their architecture, they are
also using Spark to fulfill requests coming from the web.

Tobias


Re: filtering out non English tweets using TwitterUtils

2014-11-11 Thread Tobias Pfeiffer
Hi,

On Wed, Nov 12, 2014 at 5:42 AM, SK skrishna...@gmail.com wrote:

 But getLang() is one of the methods of twitter4j.Status since version 3.0.6
 according to the doc at:
http://twitter4j.org/javadoc/twitter4j/Status.html#getLang--

 What version of twitter4j does Spark Streaming use?


3.0.3
https://github.com/apache/spark/blob/master/external/twitter/pom.xml#L53

Tobias


Re: Spark streaming cannot receive any message from Kafka

2014-11-12 Thread Tobias Pfeiffer
Bill,

However, when I am currently using Spark 1.1.0. the Spark streaming job
 cannot receive any messages from Kafka. I have not made any change to the
 code.


Do you see any suspicious messages in the log output?

Tobias


StreamingContext does not stop

2014-11-13 Thread Tobias Pfeiffer
Hi,

I am processing a bunch of HDFS data using the StreamingContext (Spark
1.1.0) which means that all files that exist in the directory at start()
time are processed in the first batch. Now when I try to stop this stream
processing using `streamingContext.stop(false, false)` (that is, even with
stopGracefully = false), it has no effect. The stop() call blocks and data
processing continues (probably it would stop after the batch, but that
would be too long since all my data is in that batch).

I am not exactly sure if this is generally true or only for the first
batch. Also I observed that stopping the stream processing during the first
batch does occasionally lead to a very long time until the stop takes place
(even if there is no data present at all).

Has anyone experienced something similar? In my processing code, do I have
to do something particular (like checking for the state of the
StreamingContext) to allow the interruption? It is quite important for me
that stopping the stream processing takes place rather quickly.

Thanks
Tobias


Re: StreamingContext does not stop

2014-11-13 Thread Tobias Pfeiffer
Hi,

I guess I found part of the issue: I said
  dstream.transform(rdd = { rdd.foreachPartition(...); rdd })
instead of
  dstream.transform(rdd = { rdd.mapPartitions(...) }),
that's why stop() would not stop the processing.

Now with the new version a non-graceful shutdown works in the sense that
Spark does not wait for my processing to complete; job generator, job
scheduler, job executor etc. all seem to be shut down fine, just the
threads that do the actual processing are not. Even after
streamingContext.stop() is done, I see logging output from my processing
task.

Is there any way to signal to my processing tasks that they should stop the
processing?

Thanks
Tobias


Re: Communication between Driver and Executors

2014-11-16 Thread Tobias Pfeiffer
Hi,

On Fri, Nov 14, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 I wonder if SparkConf is dynamically updated on all worker nodes or only
 during initialization. It can be used to piggyback information.
 Otherwise I guess you are stuck with Broadcast.
 Primarily I have had these issues moving legacy MR operators to Spark
 where MR piggybacks on Hadoop conf pretty  heavily, in spark Native
 application its rarely required. Do you have a usecase like that?


My usecase is
http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-does-not-stop-td18826.html
– that is, notifying my Spark executors that the StreamingContext has been
shut down. (Even with non-graceful shutdown, Spark doesn't seem to end the
actual execution, just all the Spark-internal timers etc.) I need to do
this properly or processing will go on for a very long time.

I have been trying to mis-use broadcast as in
- create a class with a boolean var, set to true
- query this boolean on the executors as a prerequisite to process the next
item
- when I want to shutdown, I set the boolean to false and unpersist the
broadcast variable (which will trigger re-delivery).
This is very dirty, but it works with a local[*] master. Unfortunately,
when deployed on YARN, the new value will never arrive at my executors.

Any idea what could go wrong on YARN with this approach – or what is a
good way to do this?

Thanks
Tobias


Re: Communication between Driver and Executors

2014-11-16 Thread Tobias Pfeiffer
Hi again,

On Mon, Nov 17, 2014 at 8:16 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 I have been trying to mis-use broadcast as in
 - create a class with a boolean var, set to true
 - query this boolean on the executors as a prerequisite to process the
 next item
 - when I want to shutdown, I set the boolean to false and unpersist the
 broadcast variable (which will trigger re-delivery).
 This is very dirty, but it works with a local[*] master. Unfortunately,
 when deployed on YARN, the new value will never arrive at my executors.


In fact, it seems as if change mutable object (like mutable list) and
unpersist in order to trigger redeploy only works locally. When running on
YARN, even after an unpersist, the value will always be identical to what I
shipped first. Now I wonder what unpersist actually does in that case. Must
I call unpersist from an executor or from the driver?

Thanks
Tobias


Re: Communication between Driver and Executors

2014-11-17 Thread Tobias Pfeiffer
Hi,

so I didn't manage to get the Broadcast variable with a new value
distributed to my executors in YARN mode. In local mode it worked fine, but
when running on YARN either nothing happened (when unpersist() was called
on the driver) or I got a TimeoutException (when called on the executor).
I finally dropped the use of broadcast variables and added a HTTP polling
mechanism from the executors to the driver. I find that a bit suboptimal,
in particular since there is this whole Akka infrastructure already running
and I should be able to just send messages around. However, Spark does not
seem to encourage this. (In general I find that private is a bit overused
in the Spark codebase...)

Thanks
Tobias


  1   2   3   >