merge elements in a Spark RDD under custom condition

2014-12-01 Thread Pengcheng YIN
Hi Pro,
I want to merge elements in a Spark RDD when the two elements satisfy certain 
condition

Suppose there is a RDD[Seq[Int]], where some Seq[Int] in this RDD contain 
overlapping elements. The task is to merge all overlapping Seq[Int] in this 
RDD, and store the result into a new RDD.

For example, suppose RDD[Seq[Int]] = [[1,2,3], [2,4,5], [1,2], [7,8,9]], the 
result should be [[1,2,3,4,5], [7,8,9]].

Since RDD[Seq[Int]] is very large, I cannot do it in driver program. Is it 
possible to get it done using distributed groupBy/map/reduce, etc?

Thanks in advance,

Pengcheng

Re: java.io.InvalidClassException: org.apache.spark.api.java.JavaUtils$SerializableMapWrapper; no valid constructor

2014-12-01 Thread lokeshkumar
The workaround was to wrap the map returned by spark libraries into HashMap
and then broadcast them.
Could anyone please let me know if there is any issue open? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-tp20034p20070.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down

2014-12-01 Thread Alexey Romanchuk
Hello spark users!

I found lots of strange messages in driver log. Here it is:

2014-12-01 11:54:23,849 [sparkDriver-akka.actor.default-dispatcher-25]
ERROR
akka.remote.EndpointWriter[akka://sparkDriver/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40data1.hadoop%3A17372-5/endpointWriter]
- AssociationError [akka.tcp://sparkDriver@10.54.87.173:55034] -
[akka.tcp://sparkExecutor@data1.hadoop:17372]: Error [Shut down address:
akka.tcp://sparkExecutor@data1.hadoop:17372] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://sparkExecutor@data1.hadoop:17372
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]

I got this message for every worker twice. First - for driverPropsFetcher
and next for sparkExecutor. Looks like spark shutdown remote akka system
incorrectly or there is some race condition in this process and driver sent
some data to worker, but worker's actor system already in shutdown state.

Except for this message everything works fine. But this is ERROR level
message and I found it in my ERROR only log.

Do you have any idea is it configuration issue, bug in spark or akka or
something else?

Thanks!


Re: java.io.InvalidClassException: org.apache.spark.api.java.JavaUtils$SerializableMapWrapper; no valid constructor

2014-12-01 Thread Josh Rosen
SerializableMapWrapper was added in
https://issues.apache.org/jira/browse/SPARK-3926; do you mind opening a new
JIRA and linking it to that one?

On Mon, Dec 1, 2014 at 12:17 AM, lokeshkumar lok...@dataken.net wrote:

 The workaround was to wrap the map returned by spark libraries into HashMap
 and then broadcast them.
 Could anyone please let me know if there is any issue open?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-tp20034p20070.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL 1.0.0 - RDD from snappy compress avro file

2014-12-01 Thread cjdc
Hi Vikas and Simone,

thanks for the replies.
Yeah I understand this would be easier with 1.2 but this is completely out
of my control. I really have to work with 1.0.0.

About Simone's approach, during the imports I get:
/scala import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat,
AvroKeyOutputFormat }
console:17: error: object mapreduce is not a member of package
org.apache.avro
   import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat,
AvroKeyOutputFormat }
  ^

scala import org.apache.avro.mapred.AvroKey
console:17: error: object mapred is not a member of package
org.apache.avro
   import org.apache.avro.mapred.AvroKey
  ^
scala import com.twitter.chill.avro.AvroSerializer
console:18: error: object avro is not a member of package
com.twitter.chill
   import com.twitter.chill.avro.AvroSerializer
^/






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-0-RDD-from-snappy-compress-avro-file-tp19998p20073.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Creating a SchemaRDD from an existing API

2014-12-01 Thread Niranda Perera
Hi Michael,

About this new data source API, what type of data sources would it support?
Does it have to be RDBMS necessarily?

Cheers

On Sat, Nov 29, 2014 at 12:57 AM, Michael Armbrust mich...@databricks.com
wrote:

 You probably don't need to create a new kind of SchemaRDD.  Instead I'd
 suggest taking a look at the data sources API that we are adding in Spark
 1.2.  There is not a ton of documentation, but the test cases show how to
 implement the various interfaces
 https://github.com/apache/spark/tree/master/sql/core/src/test/scala/org/apache/spark/sql/sources,
 and there is an example library for reading Avro data
 https://github.com/databricks/spark-avro.

 On Thu, Nov 27, 2014 at 10:31 PM, Niranda Perera nira...@wso2.com wrote:

 Hi,

 I am evaluating Spark for an analytic component where we do batch
 processing of data using SQL.

 So, I am particularly interested in Spark SQL and in creating a SchemaRDD
 from an existing API [1].

 This API exposes elements in a database as datasources. Using the methods
 allowed by this data source, we can access and edit data.

 So, I want to create a custom SchemaRDD using the methods and provisions
 of
 this API. I tried going through Spark documentation and the Java Docs, but
 unfortunately, I was unable to come to a final conclusion if this was
 actually possible.

 I would like to ask the Spark Devs,
 1. As of the current Spark release, can we make a custom SchemaRDD?
 2. What is the extension point to a custom SchemaRDD? or are there
 particular interfaces?
 3. Could you please point me the specific docs regarding this matter?

 Your help in this regard is highly appreciated.

 Cheers

 [1]

 https://github.com/wso2-dev/carbon-analytics/tree/master/components/xanalytics

 --
 *Niranda Perera*
 Software Engineer, WSO2 Inc.
 Mobile: +94-71-554-8430
 Twitter: @n1r44 https://twitter.com/N1R44





-- 
*Niranda Perera*
Software Engineer, WSO2 Inc.
Mobile: +94-71-554-8430
Twitter: @n1r44 https://twitter.com/N1R44


RE: Unable to compile spark 1.1.0 on windows 8.1

2014-12-01 Thread Ishwardeep Singh
Hi Judy,

Thank you for your response.

When I try to compile using maven mvn -Dhadoop.version=1.2.1 -DskipTests
clean package I get an error Error: Could not find or load main class . 
I have maven 3.0.4.

And when I run command sbt package I get the same exception as earlier.

I have done the following steps:

1. Download spark-1.1.0.tgz from the spark site and unzip the compressed zip
to a folder d:\myworkplace\software\spark-1.1.0
2. Then I downloaded sbt-0.13.7.zip and extract it to folder
d:\myworkplace\software\sbt
3. Update the PATH environment variable to include
d:\myworkplace\software\sbt\bin in the PATH.
4. Navigate to spark folder d:\myworkplace\software\spark-1.1.0
5. Run the command sbt assembly
6. As a side effect of this command a number of libraries are downloaded and
I get an initial error that path
C:\Users\ishwardeep.singh\.sbt\0.13\staging\ec3aa8f39111944cc5f2\sbt-pom-reader
does not exist. 
7. I manually create this subfolder ec3aa8f39111944cc5f2\sbt-pom-reader
and retry to get the next error as described in my initial error.

Is this the correct procedure to compile spark 1.1.0? Please let me know.

Hoping to hear from you soon.

Regards,
ishwardeep



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-compile-spark-1-1-0-on-windows-8-1-tp19996p20075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kryo exception for CassandraSQLRow

2014-12-01 Thread shahab
I am using Cassandra-Spark connector to pull data from Cassandra, process
it and write it back to Cassandra.

 Now I am  getting the following exception, and apparently it is Kryo
serialisation. Does anyone what is the reason and how this can be solved?

I also tried to register org.apache.spark.sql.cassandra.CassandraSQLRow
in  kryo.register , but even this did not solve the problem and exception
remains.

WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 7,
ip-X-Y-Z): com.esotericsoftware.kryo.KryoException: Unable to find class:
org.apache.spark.sql.cassandra.CassandraSQLRow
Serialization trace:
_2 (org.apache.spark.util.MutablePair)

com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)

com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)

org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1171)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1218)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)



I am using  Spark 1.1.0 with cassandra-spark connector 1.1.0 , here is the
build:

   org.apache.spark % spark-mllib_2.10 % 1.1.0
exclude(com.google.guava, guava),

com.google.guava % guava % 16.0 % provided,

com.datastax.spark %% spark-cassandra-connector % 1.1.0
exclude(com.google.guava, guava)   withSources() withJavadoc(),

org.apache.cassandra % cassandra-all % 2.1.1
exclude(com.google.guava, guava) ,

org.apache.cassandra % cassandra-thrift % 2.1.1
exclude(com.google.guava, guava) ,

com.datastax.cassandra % cassandra-driver-core % 2.1.2
exclude(com.google.guava, guava) ,

org.apache.spark %% spark-core % 1.1.0 % provided
exclude(com.google.guava, guava) exclude(org.apache.hadoop, hadoop
-core),

org.apache.spark %% spark-streaming % 1.1.0 % provided
exclude(com.google.guava, guava),

org.apache.spark %% spark-catalyst   % 1.1.0  % provided
exclude(com.google.guava, guava) exclude(org.apache.spark,
spark-core),

 org.apache.spark %% spark-sql % 1.1.0 %  provided
exclude(com.google.guava, guava) exclude(org.apache.spark,
spark-core),

org.apache.spark %% spark-hive % 1.1.0 % provided
exclude(com.google.guava, guava) exclude(org.apache.spark,
spark-core),

org.apache.hadoop % hadoop-client % 1.0.4 % provided,

best,
/Shahab


Spark 1.1.0: weird spark-shell behavior

2014-12-01 Thread Reinis Vicups

Hello,

I have two weird effects when working with spark-shell:


1. This code executed in spark-shell causes an exception below. At the 
same time it works perfectly when submitted with spark-submit! :


import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.mahout.math.VectorWritable
import com.google.common.io.ByteStreams
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

val hConf = HBaseConfiguration.create()
hConf.set(hbase.defaults.for.version.skip, true)
hConf.set(hbase.defaults.for.version, 0.98.6-cdh5.2.0)
hConf.set(HConstants.ZOOKEEPER_QUORUM, myserv)
hConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, 2181)
hConf.set(TableInputFormat.INPUT_TABLE, MyNS:MyTable)
val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], 
classOf[ImmutableBytesWritable], classOf[Result])

rdd.count()

--- Exception ---

14/12/01 10:45:24 ERROR ExecutorUncaughtExceptionHandler: Uncaught 
exception in thread Thread[Executor task launch worker-0,5,main]

 java.lang.ExceptionInInitializerError
at org.apache.hadoop.hbase.client.HTable.init(HTable.java:197)
at org.apache.hadoop.hbase.client.HTable.init(HTable.java:159)
at 
org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:101)
at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:113)

at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: hbase-default.xml file seems to 
be for and old version of HBase (null), this version is 0.98.6-cdh5.2.0
at 
org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73)
at 
org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105)
at 
org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116)
at 
org.apache.hadoop.hbase.client.HConnectionManager.clinit(HConnectionManager.java:222)

... 14 more

We have already checked most of the trivial stuff with class paths and 
existenceof tables and column groups, enabled HBase specific settings to 
avoid the version checking and so on. It appears that the supplied HBase 
configuration is completely ignored by context. We tried to solve this 
issue by instantiating own spark context and encountered the second 
weird effect:


2. when attempting to instantiate own SparkContext we get an exception 
below:


// imports block
...

|val conf = new SparkConf().setAppName(Simple Application)
val sc = new SparkContext(conf)

--- Exception ---

2014-12-01 10:42:24,966 WARN  o.e.j.u.c.AbstractLifeCycle - FAILED 
SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Die Adresse 
wird bereits verwendet

java.net.BindException: Die Adresse wird bereits verwendet
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:444)
at sun.nio.ch.Net.bind(Net.java:436)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)

at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at 
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)

at org.eclipse.jetty.server.Server.doStart(Server.java:293)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at 
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199)
at 
org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
at 
org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1449)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  

Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread m.sarosh
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
   public static void main(String args[])
   {
  if(args.length != 3)
  {
 System.out.println(Usage: spark-submit -class 
com.spark.SparkStream target/SparkStream-with-dependencies.jar zookeeper_ip 
group_name topic1,topic2,...);
 System.exit(1);
  }


  MapString,Integer topicMap = new HashMapString,Integer();
  String[] topic = args[2].split(,);
  for(String t: topic)
  {
 topicMap.put(t, new Integer(1));
  }

  JavaStreamingContext jssc = new 
JavaStreamingContext(spark://192.168.88.130:7077, SparkStream, new 
Duration(3000));
  JavaPairReceiverInputDStreamString, String messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

  System.out.println(Connection done);
  JavaDStreamString data = messages.map(new 
FunctionTuple2String, String, String()
{
   public String 
call(Tuple2String, String message)
   {
  
System.out.println(NewMessage: +message._2()); //for debugging
  return 
message._2();
   }
});

data.print();

  jssc.start();
  jssc.awaitTermination();

   }
}


I am running the job, and at other terminal I am running kafka-producer to 
publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hi kafka
second message
another message

But the output logs at the spark-streaming console doesn't show the messages, 
but shows zero blocks received:


---
Time: 1417107363000 ms
---

14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for 
time 1417107363000 ms (execution: 0.000 s)
14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 
1417107363000 ms
14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD 
BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 
ms
14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka 
producer-consumer on console bin/kafka-console-producer  and 
bin/kafka-console-consumer...  its working perfect, but why not the code above? 
Please help me.


Regards,
Aiman Sarosh




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 

Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread Akhil Das
It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://
192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.com wrote:

  Hi,



 I am integrating Kafka and Spark, using spark-streaming. I have created a
 topic as a kafka producer:



 bin/kafka-topics.sh --create --zookeeper localhost:2181
 --replication-factor 1 --partitions 1 --topic test





 I am publishing messages in kafka and trying to read them using
 spark-streaming java code and displaying them on screen.

 The daemons are all up: Spark-master,worker; zookeeper; kafka.

 I am writing a java code for doing it, using KafkaUtils.createStream

 code is below:



 *package* *com.spark*;



 *import* scala.Tuple2;

 *import* *kafka*.serializer.Decoder;

 *import* *kafka*.serializer.Encoder;

 *import* org.apache.spark.streaming.Duration;

 *import* org.apache.spark.*;

 *import* org.apache.spark.api.java.function.*;

 *import* org.apache.spark.api.java.*;

 *import* *org.apache.spark.streaming.kafka*.KafkaUtils;

 *import* *org.apache.spark.streaming.kafka*.*;

 *import* org.apache.spark.streaming.api.java.JavaStreamingContext;

 *import* org.apache.spark.streaming.api.java.JavaPairDStream;

 *import* org.apache.spark.streaming.api.java.JavaDStream;

 *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;

 *import* java.util.Map;

 *import* java.util.HashMap;



 *public* *class* *SparkStream* {

*public* *static* *void* main(String args[])

{

   *if*(args.length != 3)

   {

  System.*out*.println(Usage: spark-submit –class
 com.spark.SparkStream target/SparkStream-with-dependencies.jar
 zookeeper_ip group_name topic1,topic2,...);

  System.*exit*(1);

   }





   MapString,Integer topicMap = *new*
 HashMapString,Integer();

   String[] topic = args[2].split(,);

   *for*(String t: topic)

   {

  topicMap.put(t, *new* Integer(1));

   }



   JavaStreamingContext jssc = *new* JavaStreamingContext(
 spark://192.168.88.130:7077, SparkStream, *new* Duration(3000));

   JavaPairReceiverInputDStreamString, String messages =
 *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );



   System.*out*.println(Connection done);

   JavaDStreamString data = messages.map(*new* 
 *FunctionTuple2String,
 String, String()*

 {

*public* String
 call(Tuple2String, String message)

{

   System.*out*
 .println(NewMessage: +message._2()); //for debugging

   *return*
 message._2();

}

 });



 data.print();



   jssc.start();

   jssc.awaitTermination();



}

 }





 I am running the job, and at other terminal I am running kafka-producer to
 publish messages:

 #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 Hi kafka

 second message

 another message



 But the output logs at the spark-streaming console doesn't show the
 messages, but shows zero blocks received:





 ---

 Time: 1417107363000 ms

 ---



 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
 job 1417107363000 ms.0 from job set of time 1417107363000 ms

 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
 job 1417107363000 ms.0 from job set of time 1417107363000 ms

 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
 for time 1417107363000 ms (execution: 0.000 s)

 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
 1417107363000 ms

 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
 list

 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13

 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
 BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
 1417107363000 ms

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
 not accepted any resources; check your cluster UI to ensure that workers
 are registered and have sufficient memory

 14/11/27 11:56:06 INFO 

RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread m.sarosh
Hi,

The spark master is working, and I have given the same url in the code:
[cid:image001.png@01D00D82.6DC2FFF0]

The warning is gone, and the new log is:
---
Time: 141742785 ms
---

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 
from job set of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 
from job set of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
---
Time: 1417427853000 ms
---

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( 
spark://192.168.88.130:7077http://192.168.88.130:7077/ ) Open the webUI 
running on port 8080 and use the master url listed there on top left corner of 
the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, 
m.sar...@accenture.commailto:m.sar...@accenture.com wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import 

RE: Kryo exception for CassandraSQLRow

2014-12-01 Thread Ashic Mahtab
Don't know if this'll solve it, but if you're on Spark 1.1, the Cassandra 
Connector version 1.1.0 final fixed the guava back compat issue. Maybe taking 
the guava exclusions might help?

Date: Mon, 1 Dec 2014 10:48:25 +0100
Subject: Kryo exception for CassandraSQLRow
From: shahab.mok...@gmail.com
To: user@spark.apache.org

I am using Cassandra-Spark connector to pull data from Cassandra, process it 
and write it back to Cassandra.
 Now I am  getting the following exception, and apparently it is Kryo 
serialisation. Does anyone what is the reason and how this can be solved?
I also tried to register org.apache.spark.sql.cassandra.CassandraSQLRow in  
kryo.register , but even this did not solve the problem and exception remains.
WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 7, ip-X-Y-Z): 
com.esotericsoftware.kryo.KryoException: Unable to find class: 
org.apache.spark.sql.cassandra.CassandraSQLRowSerialization trace:_2 
(org.apache.spark.util.MutablePair)
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)

com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)   
 
org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1171)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)   
 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
  scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1218)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)  
  
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)  
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
   java.lang.Thread.run(Thread.java:745)


I am using  Spark 1.1.0 with cassandra-spark connector 1.1.0 , here is the 
build:







   org.apache.spark % spark-mllib_2.10 % 1.1.0 
exclude(com.google.guava, guava),

com.google.guava % guava % 16.0 % provided,
com.datastax.spark %% spark-cassandra-connector % 1.1.0 
exclude(com.google.guava, guava)   withSources() withJavadoc(),

org.apache.cassandra % cassandra-all % 2.1.1  
exclude(com.google.guava, guava) ,

org.apache.cassandra % cassandra-thrift % 2.1.1  
exclude(com.google.guava, guava) ,

com.datastax.cassandra % cassandra-driver-core % 2.1.2  
exclude(com.google.guava, guava) ,

org.apache.spark %% spark-core % 1.1.0 % provided 
exclude(com.google.guava, guava) exclude(org.apache.hadoop, 
hadoop-core),

org.apache.spark %% spark-streaming % 1.1.0 % provided  
exclude(com.google.guava, guava),

org.apache.spark %% spark-catalyst   % 1.1.0  % provided 
exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core),

 org.apache.spark %% spark-sql % 1.1.0 %  provided 
exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core),

org.apache.spark %% spark-hive % 1.1.0 % provided 
exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core), 
   

org.apache.hadoop % hadoop-client % 1.0.4 % provided,

best,/Shahab
  

Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread Akhil Das
I see you have no worker machines to execute the job

[image: Inline image 1]

You haven't configured your spark cluster properly.

Quick fix to get it running would be run it on local mode, for that change
this line

JavaStreamingContext jssc = *new* JavaStreamingContext(spark://
192.168.88.130:7077, SparkStream, *new* Duration(3000));

to this

JavaStreamingContext jssc = *new* JavaStreamingContext(local[4],
SparkStream, *new* Duration(3000));


Thanks
Best Regards

On Mon, Dec 1, 2014 at 4:18 PM, m.sar...@accenture.com wrote:

  Hi,



 The spark master is working, and I have given the same url in the code:



 The warning is gone, and the new log is:

 ---

 Time: 141742785 ms

 ---



 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
 (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0
 from job set of time 141742785 ms

 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
 (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0
 from job set of time 141742785 ms

 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
 (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785
 ms (execution: 0.001 s)

 INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
 (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms

 INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
 (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list

 INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
 (Logging.scala:logInfo(59)) - Removing RDD 25

 INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
 (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list

 INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
 (Logging.scala:logInfo(59)) - Removing RDD 24

 INFO  [sparkDriver-akka.actor.default-dispatcher-5]
 kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
 RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
 141742785 ms

 INFO  [sparkDriver-akka.actor.default-dispatcher-4]
 scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
 received 0 blocks*

 ---

 Time: 1417427853000 ms

 ---



 INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
 (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
 from job set of time 1417427853000 ms

 INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
 (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
 from job set of time 1417427853000 ms

 INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
 (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
 ms (execution: 0.001 s)

 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
 (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms

 INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
 (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list

 INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
 (Logging.scala:logInfo(59)) - Removing RDD 27

 INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
 (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list

 INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
 (Logging.scala:logInfo(59)) - Removing RDD 26

 INFO  [sparkDriver-akka.actor.default-dispatcher-4]
 kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
 RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
 1417427853000 ms

 INFO  [sparkDriver-akka.actor.default-dispatcher-6]
 scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
 received 0 blocks*



 What should be my approach now ?

 Need urgent help.



 Regards,

 Aiman



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Monday, December 01, 2014 3:56 PM
 *To:* Sarosh, M.
 *Cc:* user@spark.apache.org
 *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks



 It says:



  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
 accepted any resources; check your cluster UI to ensure that workers are
 registered and have sufficient memory



 A quick guess would be, you are giving the wrong master url. ( spark://
 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
 master url listed there on top left corner of the page.


   Thanks

 Best Regards



 On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.com wrote:

 Hi,



 I am integrating Kafka and Spark, using spark-streaming. I have created a
 topic as a kafka producer:



 

Re: Setting network variables in spark-shell

2014-12-01 Thread Shixiong Zhu
Don't set `spark.akka.frameSize` to 1. The max value of
`spark.akka.frameSize` is 2047. The unit is MB.

Best Regards,
Shixiong Zhu

2014-12-01 0:51 GMT+08:00 Yanbo yanboha...@gmail.com:


 Try to use spark-shell --conf spark.akka.frameSize=1

 在 2014年12月1日,上午12:25,Brian Dolan buddha_...@yahoo.com.INVALID 写道:

 Howdy Folks,

 What is the correct syntax in 1.0.0 to set networking variables in spark
 shell?  Specifically, I'd like to set the spark.akka.frameSize

 I'm attempting this:

 spark-shell -Dspark.akka.frameSize=1 --executor-memory 4g


 Only to get this within the session:

 System.getProperty(spark.executor.memory)
 res0: String = 4g
 System.getProperty(spark.akka.frameSize)
 res1: String = null


 I don't believe I am violating protocol, but I have also posted this to
 SO:
 http://stackoverflow.com/questions/27215288/how-do-i-set-spark-akka-framesize-in-spark-shell

 ~~
 May All Your Sequences Converge






Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want this to be decoupled from the job. So file these updates 
should be done locally only on the machine that started the job for the whole 
lifetime of this user interaction. Does Spark allow to create such long running 
tasks dynamically, so that when another (web) user starts a new task a new 
long–running task is created and run on the same node, which eventually ends 
and triggers the next task? Also, is it possible to identify a running task, so 
that a long running task can be bound to a session (db handler working on local 
db updates, until task done), and eventually restarted / recreated on failure?

~Ben


Re: Is Spark the right tool for me?

2014-12-01 Thread andy petrella
Not quite sure which geo processing you're doing are they raster, vector? More
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered
GeoMesa http://www.geomesa.org/2014/08/05/spark/?
Since you need a WMS (or alike), did you considered GeoTrellis
http://geotrellis.io/ (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is
not a geo one, and it's simple SQLite. In case you need an r-tree (or
related) index, you're headaches will come from congestion within your
database transaction... unless you go to a dedicated database like Vertica
(just mentioning)

kr,
andy



On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin 
benjamin.sta...@heidelberg-mobil.com wrote:

 Hi all,

 I need some advise whether Spark is the right tool for my zoo. My
 requirements share commonalities with „big data“, workflow coordination and
 „reactive“ event driven data processing (as in for example Haskell Arrows),
 which doesn’t make it any easier to decide on a tool set.

 NB: I have asked a similar question on the Storm mailing list, but have
 been deferred to Spark. I previously thought Storm was closer to my needs –
 but maybe neither is.

 To explain my needs it’s probably best to give an example scenario:

- A user uploads small files (typically 1-200 files, file size
typically 2-10MB per file)
- Files should be converted in parallel and on available nodes. The
conversion is actually done via native tools, so there is not so much big
data processing required, but dynamic parallelization (so for example to
split the conversion step into as many conversion tasks as files are
available). The conversion typically takes between several minutes and a
few hours.
- The converted files gathered and are stored in a single database
(containing geometries for rendering)
- Once the db is ready, a web map server is (re-)configured and the
user can make small updates to the data set via a web UI.
- … Some other data processing steps which I leave away for brevity …
- There will be initially only a few concurrent users, but the system
shall be able to scale if needed

 My current thoughts:

- I should avoid to upload files into the distributed storage during
conversion, but probably should rather have each conversion filter download
the file it is actually converting from a shared place. Other wise it’s bad
for scalability reasons (too many redundant copies of same temporary files
if there are many concurrent users and many cluster nodes).
- Apache Oozie seems an option to chain together my pipes into a
workflow. But is it a good fit with Spark? What options do I have with
Spark to chain a workflow from pipes?
- Apache Crunch seems to make it easy to dynamically parallelize tasks
(Oozie itself can’t do this). But I may not need crunch after all if I have
Spark, and it also doesn’t seem to fit to my last problem following.
- The part that causes me the most headache is the user interactive db
update: I consider to use Kafka as message bus to broker between the web UI
and a custom db handler (nb, the db is a SQLite file). But how about
update responsiveness, isn’t it that Spark will cause some lags (as opposed
to Storm)?
- The db handler probably has to be implemented as a long running
continuing task, so when a user sends some changes the handler writes these
to the db file. However, I want this to be decoupled from the job. So file
these updates should be done locally only on the machine that started the
job for the whole lifetime of this user interaction. Does Spark allow to
create such long running tasks dynamically, so that when another (web) user
starts a new task a new long–running task is created and run on the same
node, which eventually ends and triggers the next task? Also, is it
possible to identify a running task, so that a long running task can be
bound to a session (db handler working on local db updates, until task
done), and eventually restarted / recreated on failure?


 ~Ben



ensuring RDD indices remain immutable

2014-12-01 Thread rok
I have an RDD that serves as a feature look-up table downstream in my
analysis. I create it using the zipWithIndex() and because I suppose that
the elements of the RDD could end up in a different order if it is
regenerated at any point, I cache it to try and ensure that the (feature --
index) mapping remains fixed. 

However, I'm having trouble verifying that this is actually robust -- can
someone comment whether using such a mapping should be stable or is there
another preferred method? zipWithUniqueID() isn't optimal since max ID
generated this way is always greater than the number of features so I'm
trying to avoid it. 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
Thank you for mentioning GeoTrellis. I haven’t heard of this before. We have 
many custom tools and steps, I’ll check our tools fit in. The end result after 
is actually a 3D map for native OpenGL based rendering on iOS / Android [1].

I’m using GeoPackage which is basically SQLite with R-Tree and a small library 
around it (more lightweight than SpatialLite). I want to avoid accessing the 
SQLite db from any other machine or task, that’s where I thought I can use a 
long running task which is the only process responsible to update a local-only 
stored SQLite db file. As you also said SQLite  (or mostly any other file based 
db) won’t work well over network. This isn’t only limited to R-Tree but 
expected limitation because of file locking issues as documented also by SQLite.

I also thought to do the same thing when rendering the (web) maps. In 
combination with the db handler which does the actual changes, I thought to run 
a map server instance on each node, configure it to add the database location 
as map source once the task starts.

Cheers
Ben

[1] http://www.deep-map.com

Von: andy petrella andy.petre...@gmail.commailto:andy.petre...@gmail.com
Datum: Montag, 1. Dezember 2014 15:07
An: Benjamin Stadin 
benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com,
 user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Betreff: Re: Is Spark the right tool for me?

Not quite sure which geo processing you're doing are they raster, vector? More 
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered 
GeoMesahttp://www.geomesa.org/2014/08/05/spark/?
Since you need a WMS (or alike), did you considered 
GeoTrellishttp://geotrellis.io/ (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is not a 
geo one, and it's simple SQLite. In case you need an r-tree (or related) index, 
you're headaches will come from congestion within your database transaction... 
unless you go to a dedicated database like Vertica (just mentioning)

kr,
andy



On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin 
benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com
 wrote:
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want this to be 

Re: Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
… Sorry, I forgot to mention why I’m basically bound to SQLite. The workflow 
involves more data processings than I mentioned. There are several tools in the 
chain which either rely on SQLite as exchange format, or processings like data 
cleaning that are done orders of magnitude faster / or using less resources 
than a heavy weight db for these specialized (and temporary) tasks.

Von: andy petrella andy.petre...@gmail.commailto:andy.petre...@gmail.com
Datum: Montag, 1. Dezember 2014 15:07
An: Benjamin Stadin 
benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com,
 user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Betreff: Re: Is Spark the right tool for me?

Not quite sure which geo processing you're doing are they raster, vector? More 
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered 
GeoMesahttp://www.geomesa.org/2014/08/05/spark/?
Since you need a WMS (or alike), did you considered 
GeoTrellishttp://geotrellis.io/ (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is not a 
geo one, and it's simple SQLite. In case you need an r-tree (or related) index, 
you're headaches will come from congestion within your database transaction... 
unless you go to a dedicated database like Vertica (just mentioning)

kr,
andy



On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin 
benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com
 wrote:
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want this to be decoupled from the job. So file these updates 
should be done locally only on the machine that started the job for the whole 
lifetime of this user interaction. Does Spark allow to create such long running 
tasks dynamically, so that when another (web) user starts a new task a new 
long–running task is created and run on the same node, which eventually ends 
and triggers the next task? Also, is it possible to identify a running task, so 
that a long running task can be bound to a session (db handler working on local 
db updates, until task done), and eventually restarted / recreated on failure?

~Ben


How take top N of top M from RDD as RDD

2014-12-01 Thread Xuefeng Wu
Hi, I have a problem, it is easy in Scala code, but I can not take the top
N from RDD as RDD.


There are 1 Student Score, ask take top 10 age, and then take top 10
from each age, the result is 100 records.

The Scala code is here, but how can I do it in RDD,  *for RDD.take return
is Array, but other RDD.*

example Scala code:

import scala.util.Random

case class StudentScore(age: Int, num: Int, score: Int, name: Int)

val scores = for {
  i - 1 to 1
} yield {
  StudentScore(Random.nextInt(100), Random.nextInt(100),
Random.nextInt(), Random.nextInt())
}


def takeTop(scores: Seq[StudentScore], byKey: StudentScore = Int):
Seq[(Int, Seq[StudentScore])] = {
  val groupedScore = scores.groupBy(byKey)
   .map{case (_, _scores) =
(_scores.foldLeft(0)((acc, v) = acc + v.score), _scores)}.toSeq
  groupedScore.sortBy(_._1).take(10)
}

val topScores = for {
  (_, ageScores) - takeTop(scores, _.age)
  (_, numScores) - takeTop(ageScores, _.num)
} yield {
  numScores
}

topScores.size


-- 

~Yours, Xuefeng Wu/吴雪峰  敬上


Re: Is Spark the right tool for me?

2014-12-01 Thread andy petrella
Indeed. However, I guess the important load and stress is in the processing
of the 3D data (DEM or alike) into geometries/shades/whatever.
Hence you can use spark (geotrellis can be tricky for 3D, poke @lossyrob
for more info) to perform these operations then keep an RDD of only the
resulting geometries.
Those geometries won't probably that heavy, hence it might be possible to
coalesce(1, true) to have to whole thing on one node (or if your driver is
more beefy, do a collect/foreach) to create the index.
You could also create a GeoJSON of the geometries and create the r-tree on
it (not sure about this one).



On Mon Dec 01 2014 at 3:38:00 PM Stadin, Benjamin 
benjamin.sta...@heidelberg-mobil.com wrote:

 Thank you for mentioning GeoTrellis. I haven’t heard of this before. We
 have many custom tools and steps, I’ll check our tools fit in. The end
 result after is actually a 3D map for native OpenGL based rendering on iOS
 / Android [1].

 I’m using GeoPackage which is basically SQLite with R-Tree and a small
 library around it (more lightweight than SpatialLite). I want to avoid
 accessing the SQLite db from any other machine or task, that’s where I
 thought I can use a long running task which is the only process responsible
 to update a local-only stored SQLite db file. As you also said SQLite  (or
 mostly any other file based db) won’t work well over network. This isn’t
 only limited to R-Tree but expected limitation because of file locking
 issues as documented also by SQLite.

 I also thought to do the same thing when rendering the (web) maps. In
 combination with the db handler which does the actual changes, I thought to
 run a map server instance on each node, configure it to add the database
 location as map source once the task starts.

 Cheers
 Ben

 [1] http://www.deep-map.com

 Von: andy petrella andy.petre...@gmail.com
 Datum: Montag, 1. Dezember 2014 15:07
 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.com, 
 user@spark.apache.org user@spark.apache.org
 Betreff: Re: Is Spark the right tool for me?

 Not quite sure which geo processing you're doing are they raster, vector? More
 info will be appreciated for me to help you further.

 Meanwhile I can try to give some hints, for instance, did you considered
 GeoMesa http://www.geomesa.org/2014/08/05/spark/?
 Since you need a WMS (or alike), did you considered GeoTrellis
 http://geotrellis.io/ (go to the batch processing)?

 When you say SQLite, you mean that you're using Spatialite? Or your db is
 not a geo one, and it's simple SQLite. In case you need an r-tree (or
 related) index, you're headaches will come from congestion within your
 database transaction... unless you go to a dedicated database like Vertica
 (just mentioning)

 kr,
 andy



 On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin 
 benjamin.sta...@heidelberg-mobil.com wrote:

 Hi all,

 I need some advise whether Spark is the right tool for my zoo. My
 requirements share commonalities with „big data“, workflow coordination and
 „reactive“ event driven data processing (as in for example Haskell Arrows),
 which doesn’t make it any easier to decide on a tool set.

 NB: I have asked a similar question on the Storm mailing list, but have
 been deferred to Spark. I previously thought Storm was closer to my needs –
 but maybe neither is.

 To explain my needs it’s probably best to give an example scenario:

- A user uploads small files (typically 1-200 files, file size
typically 2-10MB per file)
- Files should be converted in parallel and on available nodes. The
conversion is actually done via native tools, so there is not so much big
data processing required, but dynamic parallelization (so for example to
split the conversion step into as many conversion tasks as files are
available). The conversion typically takes between several minutes and a
few hours.
- The converted files gathered and are stored in a single database
(containing geometries for rendering)
- Once the db is ready, a web map server is (re-)configured and the
user can make small updates to the data set via a web UI.
- … Some other data processing steps which I leave away for brevity …
- There will be initially only a few concurrent users, but the system
shall be able to scale if needed

 My current thoughts:

- I should avoid to upload files into the distributed storage during
conversion, but probably should rather have each conversion filter 
 download
the file it is actually converting from a shared place. Other wise it’s 
 bad
for scalability reasons (too many redundant copies of same temporary files
if there are many concurrent users and many cluster nodes).
- Apache Oozie seems an option to chain together my pipes into a
workflow. But is it a good fit with Spark? What options do I have with
Spark to chain a workflow from pipes?
- Apache Crunch seems to make it easy to dynamically parallelize
tasks (Oozie itself 

Re: ensuring RDD indices remain immutable

2014-12-01 Thread Sean Owen
I think the robust thing to do is sort the RDD, and then zipWithIndex.
Even if the RDD is recomputed, the ordering and thus assignment of IDs
should be the same.

On Mon, Dec 1, 2014 at 2:36 PM, rok rokros...@gmail.com wrote:
 I have an RDD that serves as a feature look-up table downstream in my
 analysis. I create it using the zipWithIndex() and because I suppose that
 the elements of the RDD could end up in a different order if it is
 regenerated at any point, I cache it to try and ensure that the (feature --
 index) mapping remains fixed.

 However, I'm having trouble verifying that this is actually robust -- can
 someone comment whether using such a mapping should be stable or is there
 another preferred method? zipWithUniqueID() isn't optimal since max ID
 generated this way is always greater than the number of features so I'm
 trying to avoid it.






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Time based aggregation in Real time Spark Streaming

2014-12-01 Thread pankaj
Hi,

My incoming message has time stamp as one field and i have to perform
aggregation over 3 minute of time slice.

Message sample

Item ID Item Type timeStamp
1  X   1-12-2014:12:01
1  X   1-12-2014:12:02
1  X   1-12-2014:12:03
1  y   1-12-2014:12:04
1  y   1-12-2014:12:05
1  y   1-12-2014:12:06

Aggregation Result
ItemIdItemType  count   aggregationStartTimeaggrEndTime
1  X 3  1-12-2014:12:01
1-12-2014:12:03
1  y  3   1-12-2014:12:04
 1-12-2014:12:06

What is the best way to perform time based aggregation in spark.
Kindly suggest.

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Problem creating EC2 cluster using spark-ec2

2014-12-01 Thread Dave Challis
I've been trying to create a Spark cluster on EC2 using the
documentation at https://spark.apache.org/docs/latest/ec2-scripts.html
(with Spark 1.1.1).

Running the script successfully creates some EC2 instances, HDFS etc.,
but appears to fail to copy the actual files needed to run Spark
across.

I ran the following commands:

$ cd ~/src/spark-1.1.1/ec2
$ ./spark-ec2 --key-pair=* --identity-file=* --slaves=1
--region=eu-west-1 --zone=eu-west-1a --instance-type=m3.medium
--no-ganglia launch foocluster

I see the following in the script's output:

(instance and HDFS set up happens here)
...
Persistent HDFS installed, won't start by default...
~/spark-ec2 ~/spark-ec2
Setting up spark-standalone
RSYNC'ing /root/spark/conf to slaves...
*.eu-west-1.compute.amazonaws.com
RSYNC'ing /root/spark-ec2 to slaves...
*.eu-west-1.compute.amazonaws.com
./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No
such file or directory
./spark-standalone/setup.sh: line 27:
/root/spark/sbin/start-master.sh: No such file or directory
./spark-standalone/setup.sh: line 33:
/root/spark/sbin/start-slaves.sh: No such file or directory
Setting up tachyon
RSYNC'ing /root/tachyon to slaves...
...
(Tachyon setup happens here without any problem)

I can ssh to the master (using the ./spark-ec2 login), and looking in
/root/, it contains:

$ ls /root
ephemeral-hdfs  hadoop-native  mapreduce  persistent-hdfs  scala
shark  spark  spark-ec2  tachyon

If I look in /root/spark (where the sbin directory should be found),
it only contains a single 'conf' directory:

$ ls /root/spark
conf

Any idea why spark-ec2 might have failed to copy these files across?

Thanks,
Dave

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ensuring RDD indices remain immutable

2014-12-01 Thread rok
true though I was hoping to avoid having to sort... maybe there's no way
around it. Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094p20104.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL 1.0.0 - RDD from snappy compress avro file

2014-12-01 Thread cjdc
btw the same error from above also happen on 1.1.0 (just tested)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-0-RDD-from-snappy-compress-avro-file-tp19998p20106.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How take top N of top M from RDD as RDD

2014-12-01 Thread Ritesh Kumar Singh
For converting an Array or any List to a RDD, we can try using :

sc.parallelize(groupedScore)//or whatever the name of the list
variable is

On Mon, Dec 1, 2014 at 8:14 PM, Xuefeng Wu ben...@gmail.com wrote:

 Hi, I have a problem, it is easy in Scala code, but I can not take the top
 N from RDD as RDD.


 There are 1 Student Score, ask take top 10 age, and then take top 10
 from each age, the result is 100 records.

 The Scala code is here, but how can I do it in RDD,  *for RDD.take return
 is Array, but other RDD.*

 example Scala code:

 import scala.util.Random

 case class StudentScore(age: Int, num: Int, score: Int, name: Int)

 val scores = for {
   i - 1 to 1
 } yield {
   StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), 
 Random.nextInt())
 }


 def takeTop(scores: Seq[StudentScore], byKey: StudentScore = Int): Seq[(Int, 
 Seq[StudentScore])] = {
   val groupedScore = scores.groupBy(byKey)
.map{case (_, _scores) = 
 (_scores.foldLeft(0)((acc, v) = acc + v.score), _scores)}.toSeq
   groupedScore.sortBy(_._1).take(10)
 }

 val topScores = for {
   (_, ageScores) - takeTop(scores, _.age)
   (_, numScores) - takeTop(ageScores, _.num)
 } yield {
   numScores
 }

 topScores.size


 --

 ~Yours, Xuefeng Wu/吴雪峰  敬上




Re: Spark Job submit

2014-12-01 Thread Matt Narrell
Or setting the HADOOP_CONF_DIR property.  Either way, you must have the YARN 
configuration available to the submitting application to allow for the use of 
“yarn-client” or “yarn-master”

The attached stack trace below doesn’t provide any information as to why the 
job failed.

mn

 On Nov 27, 2014, at 12:14 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Try to add your cluster's core-site.xml, yarn-site.xml, and hdfs-site.xml to 
 the CLASSPATH (and on SPARK_CLASSPATH) and submit the job.
 
 Thanks
 Best Regards
 
 On Thu, Nov 27, 2014 at 12:24 PM, Naveen Kumar Pokala 
 npok...@spcapitaliq.com mailto:npok...@spcapitaliq.com wrote:
 Code is in my windows machine and cluster is in some other network in UNIX. 
 In this case how it will identify the cluster. In case of spark cluster we 
 can clearly specify the URL like spark://ip:port. But in case of hadoop how 
 to specify that.
 
  
 
 What I have done is copied the hadoop configuration files from network to 
 local and created dummy hadoop directory(in windows machine).
 
  
 
 Submitted from spark submit by adding above dummy files location with 
 HADOOP_CONF_DIR variable.  Attaching the error.
 
  
 
  
 
 image001.png
 
  
 
 Please suggest me how to proceed from the code and how to execute from spark 
 submit from windows machine.
 
  
 
 Please provide me sample code if you have any.
 
  
 
 -Naveen
 
  
 
 From: Akhil Das [mailto:ak...@sigmoidanalytics.com 
 mailto:ak...@sigmoidanalytics.com] 
 Sent: Wednesday, November 26, 2014 10:03 PM
 To: Naveen Kumar Pokala
 Cc: user@spark.apache.org mailto:user@spark.apache.org
 Subject: Re: Spark Job submit
 
  
 
 How about?
 
  
 
 - Create a SparkContext 
 
 - setMaster as yarn-cluster
 
 - Create a JavaSparkContext with the above SparkContext
 
  
 
 And that will submit it to the yarn cluster.
 
 
 
 Thanks
 
 Best Regards
 
  
 
 On Wed, Nov 26, 2014 at 4:20 PM, Naveen Kumar Pokala npok...@spcapitaliq.com 
 mailto:npok...@spcapitaliq.com wrote:
 
 Hi.
 
  
 
 Is there a way to submit spark job on Hadoop-YARN  cluster from java code.
 
  
 
 -Naveen
 
  
 
 



Re: Time based aggregation in Real time Spark Streaming

2014-12-01 Thread Bahubali Jain
Hi,
You can associate all the messages of a 3min interval with a unique key and
then group by and finally add up.

Thanks
On Dec 1, 2014 9:02 PM, pankaj pankaje...@gmail.com wrote:

 Hi,

 My incoming message has time stamp as one field and i have to perform
 aggregation over 3 minute of time slice.

 Message sample

 Item ID Item Type timeStamp
 1  X   1-12-2014:12:01
 1  X   1-12-2014:12:02
 1  X   1-12-2014:12:03
 1  y   1-12-2014:12:04
 1  y   1-12-2014:12:05
 1  y   1-12-2014:12:06

 Aggregation Result
 ItemIdItemType  count   aggregationStartTimeaggrEndTime
 1  X 3  1-12-2014:12:01
   1-12-2014:12:03
 1  y  3   1-12-2014:12:04
  1-12-2014:12:06

 What is the best way to perform time based aggregation in spark.
 Kindly suggest.

 Thanks

 --
 View this message in context: Time based aggregation in Real time Spark
 Streaming
 http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Mllib native netlib-java/OpenBLAS

2014-12-01 Thread agg212
Thanks for your reply, but I'm still running into issues
installing/configuring the native libraries for MLlib.  Here are the steps
I've taken, please let me know if anything is incorrect.

- Download Spark source
- unzip and compile using `mvn -Pnetlib-lgpl -DskipTests clean package `
- Run `sbt/sbt publish-local`

The last step fails with the following error (full stack trace is attached
here:  error.txt
http://apache-spark-user-list.1001560.n3.nabble.com/file/n20110/error.txt 
):
[error] (sql/compile:compile) java.lang.AssertionError: assertion failed:
List(object package$DebugNode, object package$DebugNode)

Do I still have to install OPENBLAS/anything else if I build Spark from the
source using the -Pnetlib-lgpl flag?  Also, do I change the Spark version
(from 1.1.0 to 1.2.0-SNAPSHOT) in the .sbt file for my app?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p20110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Time based aggregation in Real time Spark Streaming

2014-12-01 Thread pankaj
Hi ,

suppose i keep batch size of 3 minute. in 1 batch there can be incoming
records with any time stamp.
so it is difficult to keep track of when the 3 minute interval was start and
end. i am doing output operation on worker nodes in forEachPartition not in
drivers(forEachRdd) so i cannot use any shared variable to store start/end
time bcoz shared variable like accumulator are write only in task. 

is there any solution on this.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102p20111.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



packaging from source gives protobuf compatibility issues.

2014-12-01 Thread akhandeshi
scala textFile.count()
java.lang.VerifyError: class
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$CompleteReques
tProto overrides final method
getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;

I tried ./make-distribution.sh -Dhadoop.version=2.5.0 and
/usr/local/apache-maven-3.2.3/bin/mvn -Dhadoop.version=2.5.0 -DskipTests
clean package  both are giving same errors.  I am connecting to HDFS hosted
on hadoop version 2.5.0.

I will appreciate any help anyone can provide!
Thanks,

Ami



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/packaging-from-source-gives-protobuf-compatibility-issues-tp20112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How take top N of top M from RDD as RDD

2014-12-01 Thread Debasish Das
rdd.top collects it on master...

If you want topk for a key run map / mappartition and use a bounded
priority queue and reducebykey the queues.

I experimented with topk from algebird and bounded priority queue wrapped
over jpriority queue ( spark default)...bpq is faster

Code example is here:

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3066
On Dec 1, 2014 6:46 AM, Xuefeng Wu ben...@gmail.com wrote:

 Hi, I have a problem, it is easy in Scala code, but I can not take the top
 N from RDD as RDD.


 There are 1 Student Score, ask take top 10 age, and then take top 10
 from each age, the result is 100 records.

 The Scala code is here, but how can I do it in RDD,  *for RDD.take return
 is Array, but other RDD.*

 example Scala code:

 import scala.util.Random

 case class StudentScore(age: Int, num: Int, score: Int, name: Int)

 val scores = for {
   i - 1 to 1
 } yield {
   StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), 
 Random.nextInt())
 }


 def takeTop(scores: Seq[StudentScore], byKey: StudentScore = Int): Seq[(Int, 
 Seq[StudentScore])] = {
   val groupedScore = scores.groupBy(byKey)
.map{case (_, _scores) = 
 (_scores.foldLeft(0)((acc, v) = acc + v.score), _scores)}.toSeq
   groupedScore.sortBy(_._1).take(10)
 }

 val topScores = for {
   (_, ageScores) - takeTop(scores, _.age)
   (_, numScores) - takeTop(ageScores, _.num)
 } yield {
   numScores
 }

 topScores.size


 --

 ~Yours, Xuefeng Wu/吴雪峰  敬上




RE: Unable to compile spark 1.1.0 on windows 8.1

2014-12-01 Thread Judy Nash
Have you checked out the wiki here? 
http://spark.apache.org/docs/latest/building-with-maven.html

A couple things I did differently from you:
1) I got the bits directly from github (https://github.com/apache/spark/). Use 
branch 1.1 for spark 1.1
2) execute maven command on cmd (powershell misses libraries sometimes) 
3) Increase maven memory per suggested by building with maven wiki

Hope this helps. 

-Original Message-
From: Ishwardeep Singh [mailto:ishwardeep.si...@impetus.co.in] 
Sent: Monday, December 1, 2014 1:50 AM
To: u...@spark.incubator.apache.org
Subject: RE: Unable to compile spark 1.1.0 on windows 8.1

Hi Judy,

Thank you for your response.

When I try to compile using maven mvn -Dhadoop.version=1.2.1 -DskipTests clean 
package I get an error Error: Could not find or load main class . 
I have maven 3.0.4.

And when I run command sbt package I get the same exception as earlier.

I have done the following steps:

1. Download spark-1.1.0.tgz from the spark site and unzip the compressed zip to 
a folder d:\myworkplace\software\spark-1.1.0
2. Then I downloaded sbt-0.13.7.zip and extract it to folder 
d:\myworkplace\software\sbt
3. Update the PATH environment variable to include 
d:\myworkplace\software\sbt\bin in the PATH.
4. Navigate to spark folder d:\myworkplace\software\spark-1.1.0
5. Run the command sbt assembly
6. As a side effect of this command a number of libraries are downloaded and I 
get an initial error that path 
C:\Users\ishwardeep.singh\.sbt\0.13\staging\ec3aa8f39111944cc5f2\sbt-pom-reader
does not exist. 
7. I manually create this subfolder ec3aa8f39111944cc5f2\sbt-pom-reader
and retry to get the next error as described in my initial error.

Is this the correct procedure to compile spark 1.1.0? Please let me know.

Hoping to hear from you soon.

Regards,
ishwardeep



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-compile-spark-1-1-0-on-windows-8-1-tp19996p20075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to Integrate openNLP with Spark

2014-12-01 Thread Nikhil
Hi,

I am using openNLP NER ( Token Name finder ) for parsing an Unstructured
data. In order to speed up my process( to quickly train a models and analyze
the documents from the models ), I want to use Spark and I saw on the web
that it is possible to connect openNLP with Spark using UIMAFit but I am not
sure how to do so. Though Philip Ogren has given a very nice presentation in
Spark Summit, still I am confusing.

Can someone please provide me end to end example on this. I am new in Spark
and UIMAFit, recently started working on it. 

Thanks

Nikhil Jain



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Integrate-openNLP-with-Spark-tp20117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Creating a SchemaRDD from an existing API

2014-12-01 Thread Michael Armbrust
No, it should support any data source that has a schema and can produce
rows.

On Mon, Dec 1, 2014 at 1:34 AM, Niranda Perera nira...@wso2.com wrote:

 Hi Michael,

 About this new data source API, what type of data sources would it
 support? Does it have to be RDBMS necessarily?

 Cheers

 On Sat, Nov 29, 2014 at 12:57 AM, Michael Armbrust mich...@databricks.com
  wrote:

 You probably don't need to create a new kind of SchemaRDD.  Instead I'd
 suggest taking a look at the data sources API that we are adding in Spark
 1.2.  There is not a ton of documentation, but the test cases show how
 to implement the various interfaces
 https://github.com/apache/spark/tree/master/sql/core/src/test/scala/org/apache/spark/sql/sources,
 and there is an example library for reading Avro data
 https://github.com/databricks/spark-avro.

 On Thu, Nov 27, 2014 at 10:31 PM, Niranda Perera nira...@wso2.com
 wrote:

 Hi,

 I am evaluating Spark for an analytic component where we do batch
 processing of data using SQL.

 So, I am particularly interested in Spark SQL and in creating a SchemaRDD
 from an existing API [1].

 This API exposes elements in a database as datasources. Using the methods
 allowed by this data source, we can access and edit data.

 So, I want to create a custom SchemaRDD using the methods and provisions
 of
 this API. I tried going through Spark documentation and the Java Docs,
 but
 unfortunately, I was unable to come to a final conclusion if this was
 actually possible.

 I would like to ask the Spark Devs,
 1. As of the current Spark release, can we make a custom SchemaRDD?
 2. What is the extension point to a custom SchemaRDD? or are there
 particular interfaces?
 3. Could you please point me the specific docs regarding this matter?

 Your help in this regard is highly appreciated.

 Cheers

 [1]

 https://github.com/wso2-dev/carbon-analytics/tree/master/components/xanalytics

 --
 *Niranda Perera*
 Software Engineer, WSO2 Inc.
 Mobile: +94-71-554-8430
 Twitter: @n1r44 https://twitter.com/N1R44





 --
 *Niranda Perera*
 Software Engineer, WSO2 Inc.
 Mobile: +94-71-554-8430
 Twitter: @n1r44 https://twitter.com/N1R44



Minimum cluster size for empirical testing

2014-12-01 Thread Valdes, Pablo
Hi everyone,

I’m interested in empirically measuring how faster spark works in comparison to 
Hadoop for certain problems and input corpus I currently work with (I’ve read 
Matei Zahari’s “Resilient Distributed Datasets: A Fault-Tolerant Abstraction 
for In-Memory Cluster Computing” paper and I want to perform a similar test). I 
personally think measuring the difference of speed in a single 1-node cluster 
isn’t enough, so I was wondering what would you recommend for this task, in 
regards of number of clusters/specs, etc.
I was thinking it could possible to launch a couple of CDH5 VMs across a few 
computers or do you think it would be easier to do it with Amazon EC2?

I’m particularly interested in knowing what is the overall experience in this 
case and what are your recommendations (what other common problems to test and 
what kind of benchmarks)

Have a great start of the week.
Cheers



Pablo Valdes Software Engineer | comScore, Inc. (NASDAQ:SCOR)

pval...@comscore.commailto:pval...@comscore.com



Av. Del Cóndor N° 520, oficina 202, Ciudad Empresarial, Comuna de Huechuraba, | 
Santiago | CL

...

comScore is a global leader in digital media analytics. We make audiences and 
advertising more valuable. To learn more, visit 
www.comscore.comhttp://www.comscore.com




Re: How to use FlumeInputDStream in spark cluster?

2014-12-01 Thread Ping Tang
Thank you very much for your reply.

I have a cluster of 8 nodes: m1, m2, m3.. m8. m1 configured as Spark master 
node, the rest of the nodes are all worker node. I also configured m3 as the 
History Server. But the history server fails to start.I ran FlumeEventCount in 
m1 using the right hostname and a port that is not used by any application. 
Here is the script I used to run FlumeEventCount:


#!/bin/bash


spark-submit --verbose --class 
org.apache.spark.examples.streaming.FlumeEventCount --deploy-mode client 
--master yarn-client --jars 
lib/spark-streaming-flume_2.10-1.1.0-cdh5.2.2-20141112.193826-1.jar 
/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/examples/lib/spark-examples-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar
 m1.ptang.aerohive.com 1234


Same issue observed after added  spark.ui.port=4321 in  
/etc/spark/conf/spark-defaults.conf. Followings are the exceptions from the job 
run:


14/12/01 11:51:45 INFO JobScheduler: Finished job streaming job 1417463504000 
ms.0 from job set of time 1417463504000 ms

14/12/01 11:51:45 INFO JobScheduler: Total delay: 1.465 s for time 
1417463504000 ms (execution: 1.415 s)

14/12/01 11:51:45 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to 
bind to: m1.ptang.aerohive.com/192.168.10.22:1234

at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106)

at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119)

at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74)

at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68)

at 
org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)

at 
org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)

at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)

at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)

at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)

at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)

at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.BindException: Cannot assign requested address

at sun.nio.ch.Net.bind0(Native Method)

at sun.nio.ch.Net.bind(Net.java:444)

at sun.nio.ch.Net.bind(Net.java:436)

at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)

at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)

at 
org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)

at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)

at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)

at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)

... 3 more


14/12/01 11:51:45 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 72, 
m8.ptang.aerohive.com): org.jboss.netty.channel.ChannelException: Failed to 
bind to: m1.ptang.aerohive.com/192.168.10.22:1234

org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

org.apache.avro.ipc.NettyServer.init(NettyServer.java:106)

org.apache.avro.ipc.NettyServer.init(NettyServer.java:119)

org.apache.avro.ipc.NettyServer.init(NettyServer.java:74)

org.apache.avro.ipc.NettyServer.init(NettyServer.java:68)


org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)


org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)


org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)


org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)


org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)


org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)


org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)


org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)


Remove added jar from spark context

2014-12-01 Thread ankits
Hi,

Is there a way to REMOVE a jar (added via addJar) to spark contexts? We have
a long running context used by the spark jobserver, but after trying to
update versions of classes already in the class path via addJars, the
context still runs the old versions. It would be helpful if I could remove
the old jar from the context when adding the new one to prevent running
stale code.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Remove-added-jar-from-spark-context-tp20121.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD

2014-12-01 Thread Bui, Tri
Thanks Yanbo!  That works!

The only issue is that it won’t print the predicted value from lp.features, 
from code line below.

model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print()

It prints the test input data correctly, but it keeps on printing “0.0” as the 
predicted values, which is the lp.features.

Thanks
Tri

From: Yanbo Liang [mailto:yanboha...@gmail.com]
Sent: Thursday, November 27, 2014 12:22 AM
To: Bui, Tri
Cc: user@spark.apache.org
Subject: Re: Inaccurate Estimate of weights model from 
StreamingLinearRegressionWithSGD

Hi Tri,

Maybe my latest responds for your problem is lost, whatever, the following code 
snippet can run correctly.

val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt))

model.algorithm.setIntercept(true)

Because that all setXXX() function in StreamingLinearRegressionWithSGD will 
return this.type which is an instance of itself,
so we need set other configuration in a separate line w/o return value.

2014-11-27 1:04 GMT+08:00 Bui, Tri 
tri@verizonwireless.com.invalidmailto:tri@verizonwireless.com.invalid:
Thanks Yanbo!

Modified code below:

val conf = new 
SparkConf().setMaster(local[2]).setAppName(StreamingLinearRegression)
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)).setNumIterations(args(4).toInt).setStepSize(.0001).algorithm.setIntercept(true)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()

But I am getting compile error:
[error] 
/data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:54:
 value trainOn is not a member
of org.apache.spark.mllib.regression.LinearRegressionWithSGD
[error] model.trainOn(trainingData)
[error]   ^
[error] 
/data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:55:
 value predictOnValues is not a
member of org.apache.spark.mllib.regression.LinearRegressionWithSGD
[error] model.predictOnValues(testData.map(lp = (lp.label, 
lp.features))).print()
[error]   ^
[error] two errors found
[error] (compile:compile) Compilation failed

Thanks
Tri

From: Yanbo Liang [mailto:yanboha...@gmail.commailto:yanboha...@gmail.com]
Sent: Tuesday, November 25, 2014 8:57 PM
To: Bui, Tri
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Inaccurate Estimate of weights model from 
StreamingLinearRegressionWithSGD

Hi Tri,

setIntercept() is not a member function of StreamingLinearRegressionWithSGD, 
it's a member function of LinearRegressionWithSGD(GeneralizedLinearAlgorithm) 
which is a member variable(named algorithm) of StreamingLinearRegressionWithSGD.

So you need to change your code to:
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt))
.algorithm.setIntercept(true)

Thanks
Yanbo


2014-11-25 23:51 GMT+08:00 Bui, Tri 
tri@verizonwireless.com.invalidmailto:tri@verizonwireless.com.invalid:
Thanks Liang!

It was my bad, I fat finger one of the data point, correct it and the result 
match with yours.

I am still not able to get the intercept.  I am getting   [error] 
/data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:47:
 value setIntercept
mber of org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

I try code below:
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt))
model.setIntercept(addIntercept = true).trainOn(trainingData)

and:

val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt))
.setIntercept(true)

But still get compilation error.

Thanks
Tri




From: Yanbo Liang [mailto:yanboha...@gmail.commailto:yanboha...@gmail.com]
Sent: Tuesday, November 25, 2014 4:08 AM
To: Bui, Tri
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Inaccurate Estimate of weights model from 
StreamingLinearRegressionWithSGD

The case run correctly in my environment.

14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Model 
updated at time 141690890 ms
14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Current 
model: weights, [0.8588]

Can you provide more detail information if it is convenience?

Turn on the intercept value can be set as following:
val model = new StreamingLinearRegressionWithSGD()
  .algorithm.setIntercept(true)

2014-11-25 3:31 GMT+08:00 Bui, Tri 
tri@verizonwireless.com.invalidmailto:tri@verizonwireless.com.invalid:
Hi,

I am getting incorrect weights model from StreamingLinearRegressionwith SGD.

One feature Input data is:

(1,[1])
(2,[2])
…
.
(20,[20])

The 

StreamingLinearRegressionWithSGD

2014-12-01 Thread Joanne Contact
Hi Gurus,

I did not look at the code yet. I wonder if StreamingLinearRegressionWithSGD
http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.html

is equivalent to
LinearRegressionWithSGD
http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/LinearRegressionWithSGD.htmlwith
starting weights of the current batch as the ending weights of the last
batch?

Since RidgeRegressionModel
http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/RidgeRegressionModel.html
does
not seem to have a streaming version, just wonder if this way will suffice.


Thanks!

J


Spark SQL table Join, one task is taking long

2014-12-01 Thread Venkat Subramanian
Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB
Ram each. Default serialization, Standalone, no security

Data was sqooped from relational DB to HDFS and Data is partitioned across
HDFS uniformly. I am reading a  fact table about 8 GB in size and one small
dim table from HDFS and then doing a join on them based on a criteria. .
Running the Driver on Spark shell on Spark master.

ContactDetail and DAgents are read as RDD and registered as table already.
Each of these tables have 60 to 90 fields and I am using Product class.

val CDJoinQry= sqlContext.sql(SELECT  * FROM ContactDetail, DAgents  WHERE
ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902)

CDJoinQry.map(ta = ta(4)).count   // result is a small number

This works fine and returns the result fine. Hadoop mapPartition reads and
creation of RDDs are all fine But in the Count stage, I see that one of 
task (out of 200 ) does a huge amount of Shuffle Write (some 1 GB or more)
and takes about 1.1 seconds to complete out of the 1.2 seconds of total
execution time. This task is usually around in the 3/4 th (say 160/200) of
the total tasks. At the time of that task running, one of the CPU in one
worker node goes to 100% for the duration of the task. Rest of the tasks
take few ms and does only  5 MBs of Shuffle write.  I have run it
repeatedly and this happens regardless of which worker node this particular
task is running on. I turned on Spark debug on all nodes to understand, but
it was difficult to figure out where the delay is from the logs. There are
no errors or re-trys in the logs. 

Not sure if I can post logs here for someone to look at, if so I can (about
10 Mb). Also, not sure if this normal in such a table join that one task
would take most amount of time. Let me know if you have any suggestions.

Regards,

Venkat




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: hdfs streaming context

2014-12-01 Thread Bui, Tri
Try 

(hdfs:///localhost:8020/user/data/*) 

With 3 /.

Thx
tri

-Original Message-
From: Benjamin Cuthbert [mailto:cuthbert@gmail.com] 
Sent: Monday, December 01, 2014 4:41 PM
To: user@spark.apache.org
Subject: hdfs streaming context

All,

Is it possible to stream on HDFS directory and listen for multiple files?

I have tried the following

val sparkConf = new SparkConf().setAppName(HdfsWordCount)
val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = 
ssc.textFileStream(hdfs://localhost:8020/user/data/*)
lines.filter(line = line.contains(GE))
lines.print()
ssc.start()

But I get

14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
1417469742000 ms
java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
exist.
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
at 
org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hdfs streaming context

2014-12-01 Thread Andy Twigg
Have you tried just passing a path to ssc.textFileStream() ? It
monitors the path for new files by looking at mtime/atime ; all
new/touched files in the time window appear as an rdd in the dstream.

On 1 December 2014 at 14:41, Benjamin Cuthbert cuthbert@gmail.com wrote:
 All,

 Is it possible to stream on HDFS directory and listen for multiple files?

 I have tried the following

 val sparkConf = new SparkConf().setAppName(HdfsWordCount)
 val ssc = new StreamingContext(sparkConf, Seconds(2))
 val lines = ssc.textFileStream(hdfs://localhost:8020/user/data/*)
 lines.filter(line = line.contains(GE))
 lines.print()
 ssc.start()

 But I get

 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
 1417469742000 ms
 java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
 exist.
 at 
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
 at 
 org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
 at 
 org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hdfs streaming context

2014-12-01 Thread Sean Owen
Yes, in fact, that's the only way it works. You need
hdfs://localhost:8020/user/data, I believe.

(No it's not correct to write hdfs:///...)

On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert
cuthbert@gmail.com wrote:
 All,

 Is it possible to stream on HDFS directory and listen for multiple files?

 I have tried the following

 val sparkConf = new SparkConf().setAppName(HdfsWordCount)
 val ssc = new StreamingContext(sparkConf, Seconds(2))
 val lines = ssc.textFileStream(hdfs://localhost:8020/user/data/*)
 lines.filter(line = line.contains(GE))
 lines.print()
 ssc.start()

 But I get

 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
 1417469742000 ms
 java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
 exist.
 at 
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
 at 
 org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
 at 
 org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hdfs streaming context

2014-12-01 Thread Benjamin Cuthbert
Thanks Sean,

That worked just removing the /* and leaving it as /user/data

Seems to be streaming in.


 On 1 Dec 2014, at 22:50, Sean Owen so...@cloudera.com wrote:
 
 Yes, in fact, that's the only way it works. You need
 hdfs://localhost:8020/user/data, I believe.
 
 (No it's not correct to write hdfs:///...)
 
 On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert
 cuthbert@gmail.com wrote:
 All,
 
 Is it possible to stream on HDFS directory and listen for multiple files?
 
 I have tried the following
 
 val sparkConf = new SparkConf().setAppName(HdfsWordCount)
 val ssc = new StreamingContext(sparkConf, Seconds(2))
 val lines = ssc.textFileStream(hdfs://localhost:8020/user/data/*)
 lines.filter(line = line.contains(GE))
 lines.print()
 ssc.start()
 
 But I get
 
 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
 1417469742000 ms
 java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does 
 not exist.
at 
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
at 
 org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
at 
 org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: hdfs streaming context

2014-12-01 Thread Bui, Tri
For the streaming example I am working on, Its accepted (hdfs:///user/data) 
without the localhost info.  

Let me dig through my hdfs config.





-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Monday, December 01, 2014 4:50 PM
To: Benjamin Cuthbert
Cc: user@spark.apache.org
Subject: Re: hdfs streaming context

Yes, in fact, that's the only way it works. You need 
hdfs://localhost:8020/user/data, I believe.

(No it's not correct to write hdfs:///...)

On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert cuthbert@gmail.com 
wrote:
 All,

 Is it possible to stream on HDFS directory and listen for multiple files?

 I have tried the following

 val sparkConf = new SparkConf().setAppName(HdfsWordCount)
 val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = 
 ssc.textFileStream(hdfs://localhost:8020/user/data/*)
 lines.filter(line = line.contains(GE))
 lines.print()
 ssc.start()

 But I get

 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
 1417469742000 ms
 java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
 exist.
 at 
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
 at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
 at 
 org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
 at 
 org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputD
 Stream.scala:75)
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Re: hdfs streaming context

2014-12-01 Thread Sean Owen
Yes but you can't follow three slashes with host:port. No host
probably defaults to whatever is found in your HDFS config.

On Mon, Dec 1, 2014 at 11:02 PM, Bui, Tri tri@verizonwireless.com wrote:
 For the streaming example I am working on, Its accepted (hdfs:///user/data) 
 without the localhost info.

 Let me dig through my hdfs config.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: hdfs streaming context

2014-12-01 Thread Bui, Tri
Yep. No localhost

Usually, I use hdfs:///user/data to indicates I want hdfs  or file:///user/data 
to indicates local file directory.  



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Monday, December 01, 2014 5:06 PM
To: Bui, Tri
Cc: Benjamin Cuthbert; user@spark.apache.org
Subject: Re: hdfs streaming context

Yes but you can't follow three slashes with host:port. No host probably 
defaults to whatever is found in your HDFS config.

On Mon, Dec 1, 2014 at 11:02 PM, Bui, Tri tri@verizonwireless.com wrote:
 For the streaming example I am working on, Its accepted (hdfs:///user/data) 
 without the localhost info.

 Let me dig through my hdfs config.


Re: How take top N of top M from RDD as RDD

2014-12-01 Thread Xuefeng Wu
hi Debasish,

I found test code in map translate, 
would it collect all products too?

+ val sortedProducts = products.toArray.sorted(ord.reverse)


Yours, Xuefeng Wu 吴雪峰 敬上

 On 2014年12月2日, at 上午1:33, Debasish Das debasish.da...@gmail.com wrote:
 
 rdd.top collects it on master...
 
 If you want topk for a key run map / mappartition and use a bounded priority 
 queue and reducebykey the queues.
 
 I experimented with topk from algebird and bounded priority queue wrapped 
 over jpriority queue ( spark default)...bpq is faster
 
 Code example is here:
 
 https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3066
 
 On Dec 1, 2014 6:46 AM, Xuefeng Wu ben...@gmail.com wrote:
 Hi, I have a problem, it is easy in Scala code, but I can not take the top N 
 from RDD as RDD.
 
 
 There are 1 Student Score, ask take top 10 age, and then take top 10 
 from each age, the result is 100 records.
  
 The Scala code is here, but how can I do it in RDD,  for RDD.take return is 
 Array, but other RDD.
 
 example Scala code:
 import scala.util.Random
 
 case class StudentScore(age: Int, num: Int, score: Int, name: Int)
 
 val scores = for {
   i - 1 to 1
 } yield {
   StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), 
 Random.nextInt())
 }
 
 
 def takeTop(scores: Seq[StudentScore], byKey: StudentScore = Int): 
 Seq[(Int, Seq[StudentScore])] = {
   val groupedScore = scores.groupBy(byKey)
.map{case (_, _scores) = 
 (_scores.foldLeft(0)((acc, v) = acc + v.score), _scores)}.toSeq
   groupedScore.sortBy(_._1).take(10)
 }
 
 val topScores = for {
   (_, ageScores) - takeTop(scores, _.age)
   (_, numScores) - takeTop(ageScores, _.num)
 } yield {
   numScores
 }
 
 topScores.size
 
 -- 
 
 ~Yours, Xuefeng Wu/吴雪峰  敬上


Passing Java Options to Spark AM launching

2014-12-01 Thread Mohammad Islam
Hi,How to pass the Java options (such as -XX:MaxMetaspaceSize=100M) when 
lunching AM or task containers?
This is related to running Spark on Yarn (Hadoop 2.3.0). In Map-reduce case, 
setting the property such as 
mapreduce.map.java.opts would do the work.
Any help would be highly appreciated.
Regards,Mohammad



 

Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg

 file = tranform file into a bunch of records


What does this function do exactly? Does it load the file locally?
Spark supports RDDs exceeding global RAM (cf the terasort example), but if
your example just loads each file locally, then this may cause problems.
Instead, you should load each file into an rdd with context.textFile(),
flatmap that and union these rdds.

also see
http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD, and
 in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith



Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Actually, I'm working with a binary format.  The api allows reading out a
single record at a time, but I'm not sure how to get those records into
spark (without reading everything into memory from a single file at once).



On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote:

 file = tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but if
 your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD, and
 in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith





Re: Passing Java Options to Spark AM launching

2014-12-01 Thread Tobias Pfeiffer
Hi,

have a look at the documentation for spark.driver.extraJavaOptions (which
seems to have disappeared since I looked it up last week)
and spark.executor.extraJavaOptions at 
http://spark.apache.org/docs/latest/configuration.html#runtime-environment.

Tobias


Re: Passing Java Options to Spark AM launching

2014-12-01 Thread Mohammad Islam
Thanks Tobias for the answer.Does it work for driver as well?
Regards,Mohammad 

 On Monday, December 1, 2014 5:30 PM, Tobias Pfeiffer t...@preferred.jp 
wrote:
   

 Hi,
have a look at the documentation for spark.driver.extraJavaOptions (which seems 
to have disappeared since I looked it up last week) and 
spark.executor.extraJavaOptions at 
http://spark.apache.org/docs/latest/configuration.html#runtime-environment.
Tobias



   

Re: Passing Java Options to Spark AM launching

2014-12-01 Thread Zhan Zhang
Please check whether 
https://github.com/apache/spark/pull/3409#issuecomment-64045677 solve the 
problem for launching AM.

Thanks.

Zhan Zhang
On Dec 1, 2014, at 4:49 PM, Mohammad Islam misla...@yahoo.com.INVALID wrote:

 Hi,
 How to pass the Java options (such as -XX:MaxMetaspaceSize=100M) when 
 lunching AM or task containers?
 
 This is related to running Spark on Yarn (Hadoop 2.3.0). In Map-reduce case, 
 setting the property such as 
 mapreduce.map.java.opts would do the work.
 
 Any help would be highly appreciated.
 
 Regards,
 Mohammad
 
 
 
 
  


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg
Could you modify your function so that it streams through the files record
by record and outputs them to hdfs, then read them all in as RDDs and take
the union? That would only use bounded memory.

On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote:

 Actually, I'm working with a binary format.  The api allows reading out a
 single record at a time, but I'm not sure how to get those records into
 spark (without reading everything into memory from a single file at once).



 On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote:

 file = tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but
 if your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD,
 and in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith






numpy arrays and spark sql

2014-12-01 Thread Joseph Winston
This works as expected in the 1.1 branch: 

from pyspark.sql import *

rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)]

# define the schema
schemaString = value1 value2 value3 value4 value5 value6 value7 value8 value9 
value10
fields = [StructField(field_name, IntegerType(), True) for field_name in 
schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaRDD = sqlContext.applySchema(rdd, schema)

# Register the table
schemaRDD.registerTempTable(slice)

# SQL can be run over SchemaRDDs that have been registered as a table.
results = sqlContext.sql(SELECT value1 FROM slice)

# The results of SQL queries are RDDs and support all the normal RDD operations.
print results.collect()

However changing the rdd to use a numpy array fails:

import np as np
rdd = sc.parallelize(np.arange(20).reshape(2, 10))

# define the schema
schemaString = value1 value2 value3 value4 value5 value6 value7 value8 value9 
value10
fields = [StructField(field_name, np.ndarray, True) for field_name in 
schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaRDD = sqlContext.applySchema(rdd, schema)

The error is:
Traceback (most recent call last):
  File stdin, line 2, in module
  File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 1119, in 
applySchema
_verify_type(row, schema)
  File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 735, in 
_verify_type
% (dataType, type(obj)))
TypeError: StructType(List(StructField(value1,type 
'numpy.ndarray',true),StructField(value2,type 
'numpy.ndarray',true),StructField(value3,type 
'numpy.ndarray',true),StructField(value4,type 
'numpy.ndarray',true),StructField(value5,type 
'numpy.ndarray',true),StructField(value6,type 
'numpy.ndarray',true),StructField(value7,type 
'numpy.ndarray',true),StructField(value8,type 
'numpy.ndarray',true),StructField(value9,type 
'numpy.ndarray',true),StructField(value10,type 'numpy.ndarray',true))) can 
not accept abject in type type 'numpy.ndarray'

I’ve tried np.int_ and np.int32 and they fail too.  What type should I use to 
make a numpy arrays work?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Yep, that's definitely possible.  It's one of the workarounds I was
considering.  I was just curious if there was a simpler (and perhaps more
efficient) approach.

Keith

On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg andy.tw...@gmail.com wrote:

 Could you modify your function so that it streams through the files record
 by record and outputs them to hdfs, then read them all in as RDDs and take
 the union? That would only use bounded memory.

 On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote:

 Actually, I'm working with a binary format.  The api allows reading out a
 single record at a time, but I'm not sure how to get those records into
 spark (without reading everything into memory from a single file at once).



 On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote:

 file = tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but
 if your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD,
 and in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith







Re: ALS failure with size Integer.MAX_VALUE

2014-12-01 Thread Bharath Ravi Kumar
Yes, the issue appears to be due to the 2GB block size limitation. I am
hence looking for (user, product) block sizing suggestions to work around
the block size limitation.

On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote:

 (It won't be that, since you see that the error occur when reading a
 block from disk. I think this is an instance of the 2GB block size
 limitation.)

 On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
 ilya.gane...@capitalone.com wrote:
  Hi Bharath – I’m unsure if this is your problem but the
  MatrixFactorizationModel in MLLIB which is the underlying component for
 ALS
  expects your User/Product fields to be integers. Specifically, the input
 to
  ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
 wondering if
  perhaps one of your identifiers exceeds MAX_INT, could you write a quick
  check for that?
 
  I have been running a very similar use case to yours (with more
 constrained
  hardware resources) and I haven’t seen this exact problem but I’m sure
 we’ve
  seen similar issues. Please let me know if you have other questions.
 
  From: Bharath Ravi Kumar reachb...@gmail.com
  Date: Thursday, November 27, 2014 at 1:30 PM
  To: user@spark.apache.org user@spark.apache.org
  Subject: ALS failure with size  Integer.MAX_VALUE
 
  We're training a recommender with ALS in mllib 1.1 against a dataset of
 150M
  users and 4.5K items, with the total number of training records being 1.2
  Billion (~30GB data). The input data is spread across 1200 partitions on
  HDFS. For the training, rank=10, and we've configured {number of user
 data
  blocks = number of item data blocks}. The number of user/item blocks was
  varied  between 50 to 1200. Irrespective of the block size (e.g. at 1200
  blocks each), there are atleast a couple of tasks that end up shuffle
  reading  9.7G each in the aggregate stage (ALS.scala:337) and failing
 with
  the following exception:
 
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
  at
 org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
  at
 
 org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
  at
 
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
 



Re: Calling spark from a java web application.

2014-12-01 Thread ryaminal
If you are able to use YARN in your hadoop cluster, then the following
technique is pretty straightforward:
http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/

We use this in our system and it's super easy to execute from our Tomcat
application.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-spark-from-a-java-web-application-tp20007p20145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg
You may be able to construct RDDs directly from an iterator - not sure
- you may have to subclass your own.

On 1 December 2014 at 18:40, Keith Simmons ke...@pulse.io wrote:
 Yep, that's definitely possible.  It's one of the workarounds I was
 considering.  I was just curious if there was a simpler (and perhaps more
 efficient) approach.

 Keith

 On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg andy.tw...@gmail.com wrote:

 Could you modify your function so that it streams through the files record
 by record and outputs them to hdfs, then read them all in as RDDs and take
 the union? That would only use bounded memory.

 On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote:

 Actually, I'm working with a binary format.  The api allows reading out a
 single record at a time, but I'm not sure how to get those records into
 spark (without reading everything into memory from a single file at once).



 On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote:

 file = tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but
 if your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD,
 and in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: numpy arrays and spark sql

2014-12-01 Thread Davies Liu
applySchema() only accept RDD of Row/list/tuple, it does not work with
numpy.array.

After applySchema(), the Python RDD will be pickled and unpickled in
JVM, so you will not have any benefit by using numpy.array.

It will work if you convert ndarray into list:

schemaRDD = sqlContext.applySchema(rdd.map(list), schema)

On Mon, Dec 1, 2014 at 6:33 PM, Joseph Winston josephwins...@me.com wrote:
 This works as expected in the 1.1 branch:

 from pyspark.sql import *

 rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)]

 # define the schema
 schemaString = value1 value2 value3 value4 value5 value6 value7 value8 
 value9 value10
 fields = [StructField(field_name, IntegerType(), True) for field_name in 
 schemaString.split()]
 schema = StructType(fields)

 # Apply the schema to the RDD.
 schemaRDD = sqlContext.applySchema(rdd, schema)

 # Register the table
 schemaRDD.registerTempTable(slice)

 # SQL can be run over SchemaRDDs that have been registered as a table.
 results = sqlContext.sql(SELECT value1 FROM slice)

 # The results of SQL queries are RDDs and support all the normal RDD 
 operations.
 print results.collect()

 However changing the rdd to use a numpy array fails:

 import np as np
 rdd = sc.parallelize(np.arange(20).reshape(2, 10))

 # define the schema
 schemaString = value1 value2 value3 value4 value5 value6 value7 value8 
 value9 value10
 fields = [StructField(field_name, np.ndarray, True) for field_name in 
 schemaString.split()]
 schema = StructType(fields)

 # Apply the schema to the RDD.
 schemaRDD = sqlContext.applySchema(rdd, schema)

 The error is:
 Traceback (most recent call last):
   File stdin, line 2, in module
   File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 1119, in 
 applySchema
 _verify_type(row, schema)
   File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 735, in 
 _verify_type
 % (dataType, type(obj)))
 TypeError: StructType(List(StructField(value1,type 
 'numpy.ndarray',true),StructField(value2,type 
 'numpy.ndarray',true),StructField(value3,type 
 'numpy.ndarray',true),StructField(value4,type 
 'numpy.ndarray',true),StructField(value5,type 
 'numpy.ndarray',true),StructField(value6,type 
 'numpy.ndarray',true),StructField(value7,type 
 'numpy.ndarray',true),StructField(value8,type 
 'numpy.ndarray',true),StructField(value9,type 
 'numpy.ndarray',true),StructField(value10,type 'numpy.ndarray',true))) can 
 not accept abject in type type 'numpy.ndarray'

 I’ve tried np.int_ and np.int32 and they fail too.  What type should I use to 
 make a numpy arrays work?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.io.IOException: Filesystem closed

2014-12-01 Thread rapelly kartheek
Hi,

I face the following exception when submit a spark application. The log
file shows:

14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener
threw an exception
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629)
at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614)
at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120)
at
org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158)
at
org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158)
at
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87)
at
org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112)
at
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52)
at
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52)
at
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)
at
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)
at
org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52)
at
org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)

Someone please help me resolve this!!

Thanks


Re: java.io.IOException: Filesystem closed

2014-12-01 Thread Akhil Das
What is the application that you are submitting? Looks like you might have
invoked fs inside the app and then closed it within it.

Thanks
Best Regards

On Tue, Dec 2, 2014 at 11:59 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Hi,

 I face the following exception when submit a spark application. The log
 file shows:

 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener
 threw an exception
 java.io.IOException: Filesystem closed
 at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
 at
 org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668)
 at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629)
 at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614)
 at
 org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120)
 at
 org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158)
 at
 org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158)
 at scala.Option.foreach(Option.scala:236)
 at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158)
 at
 org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87)
 at
 org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112)
 at
 org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52)
 at
 org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52)
 at
 org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)
 at
 org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)
 at
 org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52)
 at
 org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)
 at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
 at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)
 at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
 at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
 at
 org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)

 Someone please help me resolve this!!

 Thanks