Re: How to create Spark DataFrame using custom Hadoop InputFormat?

2015-07-31 Thread Umesh Kacha
Hi thanks Void works I use same custom format in Hive and it works with
Void as key. Please share example if you have to create DataFrame using
custom Hadoop format.
On Aug 1, 2015 2:07 AM, Ted Yu yuzhih...@gmail.com wrote:

 I don't think using Void class is the right choice - it is not even a
 Writable.

 BTW in the future, capture text output instead of image.

 Thanks

 On Fri, Jul 31, 2015 at 12:35 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Ted thanks My key is always Void because my custom format file is non
 splittable so key is Void and values is  MyRecordWritable which extends
 Hadoop Writable. I am sharing my log as snap please dont mind as I cant
 paste code outside.

 Regards,
 Umesh

 On Sat, Aug 1, 2015 at 12:59 AM, Ted Yu yuzhih...@gmail.com wrote:

 Looking closer at the code you posted, the error likely was caused by
 the 3rd parameter: Void.class

 It is supposed to be the class of key.

 FYI

 On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I am having my own Hadoop custom InputFormat which I need to use in
 creating DataFrame. I tried to do the following

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe =
 sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class);
 myFormatAsDataframe.show();

 Above code does not work and throws exception saying
 java.lang.IllegalArgumentException object is not an instance of
 declaring
 class

 My custom Hadoop InputFormat works very well with Hive,MapReduce etc
 How do
 I make it work with Spark please guide I am new to Spark. Thank in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







RandomForest in Pyspark (version 1.4.1)

2015-07-31 Thread SK

Hi,

I tried to develop a RandomForest model for my data in PySpark as follows:

   rf_model = RandomForest.trainClassifier(train_idf, 2, {},
numTrees=15, seed=144)
   print RF: Num trees = %d, Num nodes = %d\n %(rf_model.numTrees(),
rf_model.totalNumNodes())

   pred_label = test_idf.map(lambda p:
(float(rf_model.predict(p.features)), p.label))
   print pred_label.take(5)  ## exception

I am  getting the following error at the highlighted statement.
  
 Exception: It appears that you are attempting to reference
SparkContext from a broadcast variable, action, or   transforamtion.
SparkContext can only be used on the driver, not in code that it run on
workers. For more information, see SPARK-5063.

 I have used the same set of statements for linear models
(LogisticRegresssion and SVM) in PySpark and was able to get the predictions
abd print them.  I am not sure why I am getting the above exception. I am
not using the SparkContenxt directly in any of the above statements. I would
appreciate your help. 

thanks







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RandomForest-in-Pyspark-version-1-4-1-tp24103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Big Integer number in Spark

2015-07-31 Thread ssingal05
Do you notice how you are making a List of Int's?

input: org.apache.spark.rdd.RDD[*Int*] = ParallelCollectionRDD[0] at
parallelize at console:21 

And these are also being mapped to more Int's

result: org.apache.spark.rdd.RDD[*Int*] = MapPartitionsRDD[1] at map at
console:23 

Generally, (signed) Integers can range from (-2^31) to (2^31-1), but that
mapping makes your new integer bigger than 2^31. So the number will wrap
around into the negatives.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Big-Integer-number-in-Spark-tp24095p24096.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[POWERED BY] Please add Typesafe to the list of organizations

2015-07-31 Thread Dean Wampler
Typesafe (http://typesafe.com). We provide commercial support for Spark on
Mesos and Mesosphere DCOS. We contribute to Spark's Mesos integration and
Spark Streaming enhancements.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com


Re: setting fs.umask in pyspark

2015-07-31 Thread aesilberstein
Found an approach:
sc._jsc.hadoopConfiguration().set(key, value)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/setting-fs-umask-in-pyspark-tp24070p24098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issues with JavaRDD.subtract(JavaRDD) method in local vs. cluster mode

2015-07-31 Thread Sebastian Kalix
Thanks for the quick  reply. I will be unable to collect more data until
Monday though, but I will update the thread accordingly.

I am using Spark 1.4.0. Were there any related issues reported? I wasn't
able to find any, but I may have overlooked something. I have also updated
the original question to include the relevant Java files, maybe the issue
is hidden there somewhere.

Ted Yu yuzhih...@gmail.com schrieb am Fr., 31. Juli 2015 um 18:09 Uhr:

 Can you call collect() and log the output to get more clue what is left ?

 Which Spark release are you using ?

 Cheers

 On Fri, Jul 31, 2015 at 9:01 AM, Warfish sebastian.ka...@gmail.com
 wrote:

 Hi everyone,

 I work with Spark for a little while now and have encountered a strange
 problem that gives me headaches, which has to do with the JavaRDD.subtract
 method. Consider the following piece of code:

 public static void main(String[] args) {
 //context is of type JavaSparkContext; FILE is the filepath to my
 input file
 JavaRDDString rawTestSet   = context.textFile(FILE);
 JavaRDDString rawTestSet2 = context.textFile(FILE);

 //Gives 0 everytime - Correct
 System.out.println(rawTestSetMinusRawTestSet2=  +
 rawTestSet.subtract(rawTestSet2).count());

 //SearchData is a custom POJO that holds my data
 JavaRDDSearchData testSet  = convert(rawTestSet);
 JavaRDDSearchData testSet2= convert(rawTestSet);
 JavaRDDSearchData testSet3= convert(rawTestSet2);

 //These calls give numbers !=0 on cluster mode - Incorrect
 System.out.println(testSetMinuesTestSet2 =  +
 testSet.subtract(testSet2).count());
 System.out.println(testSetMinuesTestSet3 =  +
 testSet.subtract(testSet3).count());
 System.out.println(testSet2MinuesTestSet3   =  +
 testSet2.subtract(testSet3).count());
 }

 private static JavaRDDSearchData convert(JavaRDDString input) {
 return input.filter(new Matches(myRegex))
  .map(new DoSomething())
  .map(new Split(mySplitParam))
  .map(new ToMap())
  .map(new Clean())
  .map(new ToSearchData());
 }

 In this code, I read a file (usually from HDFS, but applies to disk as
 well)
 and then convert the Strings into custom objects to hold the data using a
 chain of filter- and map-operations. These objects are simple POJOs with
 overriden hashCode() and equal() functions. I then apply the subtract
 method
 to several JavaRDDs that contain exact equal data.

 Note: I have omitted the POJO code and the filter- and map-functions to
 make
 the code more concise, but I can post it later if the need arises.

 In the main method shown above are several calls of the subtract method,
 all
 of which should give empty RDDs as results because the data in all RDDs
 should be exactly the same. This works for Spark in local mode, however
 when
 executing the code on a cluster the second block of subtract calls does
 not
 result in empty sets, which tells me that it is a more complicated issue.
 The input data on local and cluster mode was exactly the same.

 Can someone shed some light on this issue? I feel like I'm overlooking
 something rather obvious.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-JavaRDD-subtract-JavaRDD-method-in-local-vs-cluster-mode-tp24099.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Issues with JavaRDD.subtract(JavaRDD) method in local vs. cluster mode

2015-07-31 Thread Ted Yu
Can you call collect() and log the output to get more clue what is left ?

Which Spark release are you using ?

Cheers

On Fri, Jul 31, 2015 at 9:01 AM, Warfish sebastian.ka...@gmail.com wrote:

 Hi everyone,

 I work with Spark for a little while now and have encountered a strange
 problem that gives me headaches, which has to do with the JavaRDD.subtract
 method. Consider the following piece of code:

 public static void main(String[] args) {
 //context is of type JavaSparkContext; FILE is the filepath to my
 input file
 JavaRDDString rawTestSet   = context.textFile(FILE);
 JavaRDDString rawTestSet2 = context.textFile(FILE);

 //Gives 0 everytime - Correct
 System.out.println(rawTestSetMinusRawTestSet2=  +
 rawTestSet.subtract(rawTestSet2).count());

 //SearchData is a custom POJO that holds my data
 JavaRDDSearchData testSet  = convert(rawTestSet);
 JavaRDDSearchData testSet2= convert(rawTestSet);
 JavaRDDSearchData testSet3= convert(rawTestSet2);

 //These calls give numbers !=0 on cluster mode - Incorrect
 System.out.println(testSetMinuesTestSet2 =  +
 testSet.subtract(testSet2).count());
 System.out.println(testSetMinuesTestSet3 =  +
 testSet.subtract(testSet3).count());
 System.out.println(testSet2MinuesTestSet3   =  +
 testSet2.subtract(testSet3).count());
 }

 private static JavaRDDSearchData convert(JavaRDDString input) {
 return input.filter(new Matches(myRegex))
  .map(new DoSomething())
  .map(new Split(mySplitParam))
  .map(new ToMap())
  .map(new Clean())
  .map(new ToSearchData());
 }

 In this code, I read a file (usually from HDFS, but applies to disk as
 well)
 and then convert the Strings into custom objects to hold the data using a
 chain of filter- and map-operations. These objects are simple POJOs with
 overriden hashCode() and equal() functions. I then apply the subtract
 method
 to several JavaRDDs that contain exact equal data.

 Note: I have omitted the POJO code and the filter- and map-functions to
 make
 the code more concise, but I can post it later if the need arises.

 In the main method shown above are several calls of the subtract method,
 all
 of which should give empty RDDs as results because the data in all RDDs
 should be exactly the same. This works for Spark in local mode, however
 when
 executing the code on a cluster the second block of subtract calls does not
 result in empty sets, which tells me that it is a more complicated issue.
 The input data on local and cluster mode was exactly the same.

 Can someone shed some light on this issue? I feel like I'm overlooking
 something rather obvious.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-JavaRDD-subtract-JavaRDD-method-in-local-vs-cluster-mode-tp24099.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: How to register array class with Kyro in spark-defaults.conf

2015-07-31 Thread Wang, Ningjun (LNG-NPV)
Does anybody have any idea how to solve this problem?

Ningjun

From: Wang, Ningjun (LNG-NPV)
Sent: Thursday, July 30, 2015 11:06 AM
To: user@spark.apache.org
Subject: How to register array class with Kyro in spark-defaults.conf

I register my class with Kyro in spark-defaults.conf as follow

spark.serializer   
org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired true
spark.kryo.classesToRegister  ltn.analytics.es.EsDoc

But I got the following exception

java.lang.IllegalArgumentException: Class is not registered: 
ltn.analytics.es.EsDoc[]
Note: To register this class use: kryo.register(ltn.analytics.es.EsDoc[].class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
at 
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:162)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


The error message seems to suggest that I should also register the array class 
EsDoc[]. So I add it to spark-defaults.conf as follow

spark.kryo.classesToRegister  ltn.analytics.es.EsDoc,ltn.analytics.es.EsDoc[]

Then I got the following error

org.apache.spark.SparkException: Failed to register classes with Kryo
at 
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101)
at 
org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:153)
at 
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:115)
at 
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at 
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at ltn.analytics.index.Index.addDocuments(Index.scala:82)

Please advise.

Thanks.
Ningjun


Issues with JavaRDD.subtract(JavaRDD) method in local vs. cluster mode

2015-07-31 Thread Warfish
Hi everyone,

I work with Spark for a little while now and have encountered a strange
problem that gives me headaches, which has to do with the JavaRDD.subtract
method. Consider the following piece of code:

public static void main(String[] args) {
//context is of type JavaSparkContext; FILE is the filepath to my
input file
JavaRDDString rawTestSet   = context.textFile(FILE);
JavaRDDString rawTestSet2 = context.textFile(FILE);

//Gives 0 everytime - Correct
System.out.println(rawTestSetMinusRawTestSet2=  +
rawTestSet.subtract(rawTestSet2).count());

//SearchData is a custom POJO that holds my data
JavaRDDSearchData testSet  = convert(rawTestSet);
JavaRDDSearchData testSet2= convert(rawTestSet);
JavaRDDSearchData testSet3= convert(rawTestSet2);

//These calls give numbers !=0 on cluster mode - Incorrect
System.out.println(testSetMinuesTestSet2 =  +
testSet.subtract(testSet2).count());
System.out.println(testSetMinuesTestSet3 =  +
testSet.subtract(testSet3).count());
System.out.println(testSet2MinuesTestSet3   =  +
testSet2.subtract(testSet3).count());
}

private static JavaRDDSearchData convert(JavaRDDString input) {
return input.filter(new Matches(myRegex))
 .map(new DoSomething())
 .map(new Split(mySplitParam))
 .map(new ToMap())
 .map(new Clean())
 .map(new ToSearchData());
}

In this code, I read a file (usually from HDFS, but applies to disk as well)
and then convert the Strings into custom objects to hold the data using a
chain of filter- and map-operations. These objects are simple POJOs with
overriden hashCode() and equal() functions. I then apply the subtract method
to several JavaRDDs that contain exact equal data. 

Note: I have omitted the POJO code and the filter- and map-functions to make
the code more concise, but I can post it later if the need arises.

In the main method shown above are several calls of the subtract method, all
of which should give empty RDDs as results because the data in all RDDs
should be exactly the same. This works for Spark in local mode, however when
executing the code on a cluster the second block of subtract calls does not
result in empty sets, which tells me that it is a more complicated issue.
The input data on local and cluster mode was exactly the same. 

Can someone shed some light on this issue? I feel like I'm overlooking
something rather obvious.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-JavaRDD-subtract-JavaRDD-method-in-local-vs-cluster-mode-tp24099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Setting a stage timeout

2015-07-31 Thread William Kinney
Hi,

I had a job that got stuck on yarn due to
https://issues.apache.org/jira/browse/SPARK-6954
It never exited properly.

Is there a way to set a timeout for a stage or all stages?


Re: SparkLauncher not notified about finished job - hangs infinitely.

2015-07-31 Thread Elkhan Dadashov
Nope, output stream of that subprocess should be spark.getInputStream()

According to Oracle Doc
https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getOutputStream--
:

public abstract InputStream
https://docs.oracle.com/javase/8/docs/api/java/io/InputStream.html
 getInputStream()
Returns the input stream connected to the normal output of the subprocess.
The stream obtains data piped from the standard output of the process
represented by this Process object.

On Fri, Jul 31, 2015 at 10:10 AM, Ted Yu yuzhih...@gmail.com wrote:

 minor typo:

 bq. output (spark.getInputStream())

 Should be spark.getOutputStream()

 Cheers

 On Fri, Jul 31, 2015 at 10:02 AM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Hi Tomasz,

 *Answer to your 1st question*:

 Clear/read the error (spark.getErrorStream()) and output
 (spark.getInputStream()) stream buffers before you call spark.waitFor(), it
 would be better to clear/read them with 2 different threads. Then it should
 work fine.

 As Spark job is launched as subprocess, and according to Oracle
 documentation
 https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html:

 By default, the created subprocess does not have its own terminal or
 console. All its standard I/O (i.e. stdin, stdout, stderr) operations will
 be redirected to the parent process, where they can be accessed via the
 streams obtained using the methodsgetOutputStream(), getInputStream(), and
 getErrorStream(). The parent process uses these streams to feed input to
 and get output from the subprocess. Because some native platforms only
 provide limited buffer size for standard input and output streams, failure
 to promptly write the input stream or read the output stream of the
 subprocess may cause the subprocess to block, or even deadlock.
 



 On Fri, Jul 31, 2015 at 2:45 AM, Tomasz Guziałek 
 tomasz.guzia...@humaninference.com wrote:

 I am trying to submit a JAR with Spark job into the YARN cluster from
 Java code. I am using SparkLauncher to submit SparkPi example:

 Process spark = new SparkLauncher()

 .setAppResource(C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar)
 .setMainClass(org.apache.spark.examples.SparkPi)
 .setMaster(yarn-cluster)
 .launch();
 System.out.println(Waiting for finish...);
 int exitCode = spark.waitFor();
 System.out.println(Finished! Exit code: + exitCode);

 There are two problems:

 1. While submitting in yarn-cluster mode, the application is
 successfully submitted to YARN and executes successfully (it is visible in
 the YARN UI, reported as SUCCESS and PI value is printed in the output).
 However, the submitting application is never notified that processing is
 finished - it hangs infinitely after printing Waiting to finish... The
 log of the container can be found here: http://pastebin.com/LscBjHQc
 2. While submitting in yarn-client mode, the application does not
 appear in YARN UI and the submitting application hangs at Waiting to
 finish... When hanging code is killed, the application shows up in YARN UI
 and it is reported as SUCCESS, but the output is empty (PI value is not
 printed out). The log of the container can be found here:
 http://pastebin.com/9KHi81r4

 I tried to execute the submitting application both with Oracle Java 8
 and 7.



 Any hints what might be wrong?



 Best regards,

 Tomasz




 --

 Best regards,
 Elkhan Dadashov





-- 

Best regards,
Elkhan Dadashov


Re: How to control Spark Executors from getting Lost when using YARN client mode?

2015-07-31 Thread Umesh Kacha
Hi thanks for the response. It looks like YARN container is getting killed
but dont know why I see shuffle metafetchexception as mentioned in the
following SO link. I have enough memory 8 nodes 8 cores 30 gig memory each.
And because of this metafetchexpcetion YARN killing container running
executor how can it over run memory I tried to give each executor 25 gig
still it is not sufficient and it fails. Please guide I dont understand
what is going on I am using Spark 1.4.0 I am using spark.shuffle.memory as
0.0 and spark.storage.memory as 0.5. I have almost all optimal properties
like Kyro serializer I have kept 500 akka frame size 20 akka threads dont
know I am trapped its been two days I am trying to recover from this issue.

http://stackoverflow.com/questions/29850784/what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept



On Thu, Jul 30, 2015 at 9:56 PM, Ashwin Giridharan ashwin.fo...@gmail.com
wrote:

 What is your cluster configuration ( size and resources) ?

 If you do not have enough resources, then your executor will not run.
 Moreover allocating 8 cores to an executor is too much.

 If you have a cluster with four nodes running NodeManagers, each equipped
 with 4 cores and 8GB of memory,
 then an optimal configuration would be,

 --num-executors 8 --executor-cores 2 --executor-memory 2G

 Thanks,
 Ashwin

 On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have one Spark job which runs fine locally with less data but when I
 schedule it on YARN to execute I keep on getting the following ERROR and
 slowly all executors gets removed from UI and my job fails

 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on
 myhost1.com: remote Rpc client disassociated
 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on
 myhost2.com: remote Rpc client disassociated
 I use the following command to schedule spark job in yarn-client mode

  ./spark-submit --class com.xyz.MySpark --conf
 spark.executor.extraJavaOptions=-XX:MaxPermSize=512M
 --driver-java-options
 -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client
 --executor-memory 2G --executor-cores 8 --num-executors 12
 /home/myuser/myspark-1.0.jar

 I dont know what is the problem please guide. I am new to Spark. Thanks in
 advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Thanks  Regards,
 Ashwin Giridharan



Re: How to add multiple sequence files from HDFS to a Spark Context to do Batch processing?

2015-07-31 Thread Marcelo Vanzin
file can be a directory (look at all children) or even a glob
(/path/*.ext, for example).

On Fri, Jul 31, 2015 at 11:35 AM, swetha swethakasire...@gmail.com wrote:

 Hi,

 How to add multiple sequence files from HDFS to a Spark Context to do Batch
 processing? I have something like the following in my code. Do I have to
 add
 Comma separated list of Sequence file paths to the Spark Context.

  val data  = if(args.length0  args(0)!=null)
   sc.sequenceFile(file,  classOf[LongWritable], classOf[Text]).
 map{case (x, y) = (x.toString, y.toString)}

 Thanks,
 Swetha



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-multiple-sequence-files-from-HDFS-to-a-Spark-Context-to-do-Batch-processing-tp24102.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo


Re: How to create Spark DataFrame using custom Hadoop InputFormat?

2015-07-31 Thread Ted Yu
Can you pastebin the complete stack trace ?

If you can show skeleton of MyInputFormat and MyRecordWritable, that would
provide additional information as well.

Cheers

On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I am having my own Hadoop custom InputFormat which I need to use in
 creating DataFrame. I tried to do the following

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe =
 sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class);
 myFormatAsDataframe.show();

 Above code does not work and throws exception saying
 java.lang.IllegalArgumentException object is not an instance of declaring
 class

 My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do
 I make it work with Spark please guide I am new to Spark. Thank in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkLauncher not notified about finished job - hangs infinitely.

2015-07-31 Thread Elkhan Dadashov
Hi Tomasz,

*Answer to your 1st question*:

Clear/read the error (spark.getErrorStream()) and output
(spark.getInputStream()) stream buffers before you call spark.waitFor(), it
would be better to clear/read them with 2 different threads. Then it should
work fine.

As Spark job is launched as subprocess, and according to Oracle
documentation
https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html:

By default, the created subprocess does not have its own terminal or
console. All its standard I/O (i.e. stdin, stdout, stderr) operations will
be redirected to the parent process, where they can be accessed via the
streams obtained using the methodsgetOutputStream(), getInputStream(), and
getErrorStream(). The parent process uses these streams to feed input to
and get output from the subprocess. Because some native platforms only
provide limited buffer size for standard input and output streams, failure
to promptly write the input stream or read the output stream of the
subprocess may cause the subprocess to block, or even deadlock.




On Fri, Jul 31, 2015 at 2:45 AM, Tomasz Guziałek 
tomasz.guzia...@humaninference.com wrote:

 I am trying to submit a JAR with Spark job into the YARN cluster from Java
 code. I am using SparkLauncher to submit SparkPi example:

 Process spark = new SparkLauncher()

 .setAppResource(C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar)
 .setMainClass(org.apache.spark.examples.SparkPi)
 .setMaster(yarn-cluster)
 .launch();
 System.out.println(Waiting for finish...);
 int exitCode = spark.waitFor();
 System.out.println(Finished! Exit code: + exitCode);

 There are two problems:

 1. While submitting in yarn-cluster mode, the application is
 successfully submitted to YARN and executes successfully (it is visible in
 the YARN UI, reported as SUCCESS and PI value is printed in the output).
 However, the submitting application is never notified that processing is
 finished - it hangs infinitely after printing Waiting to finish... The
 log of the container can be found here: http://pastebin.com/LscBjHQc
 2. While submitting in yarn-client mode, the application does not appear
 in YARN UI and the submitting application hangs at Waiting to finish...
 When hanging code is killed, the application shows up in YARN UI and it is
 reported as SUCCESS, but the output is empty (PI value is not printed out).
 The log of the container can be found here: http://pastebin.com/9KHi81r4

 I tried to execute the submitting application both with Oracle Java 8 and
 7.



 Any hints what might be wrong?



 Best regards,

 Tomasz




-- 

Best regards,
Elkhan Dadashov


Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Sean Owen
If you've set the checkpoint dir, it seems like indeed the intent is
to use a default checkpoint interval in DStream:

private[streaming] def initialize(time: Time) {
...
  // Set the checkpoint interval to be slideDuration or 10 seconds,
which ever is larger
  if (mustCheckpoint  checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) /
slideDuration).toInt
logInfo(Checkpoint interval automatically set to  + checkpointDuration)
  }

Do you see that log message? what's the interval? that could at least
explain why it's not doing anything, if it's quite long.

It sort of seems wrong though since
https://spark.apache.org/docs/latest/streaming-programming-guide.html
suggests it was intended to be a multiple of the batch interval. The
slide duration wouldn't always be relevant anyway.

On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
dgoldenberg...@gmail.com wrote:
 I've instrumented checkpointing per the programming guide and I can tell
 that Spark Streaming is creating the checkpoint directories but I'm not
 seeing any content being created in those directories nor am I seeing the
 effects I'd expect from checkpointing.  I'd expect any data that comes into
 Kafka while the consumers are down, to get picked up when the consumers are
 restarted; I'm not seeing that.

 For now my checkpoint directory is set to the local file system with the
 directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
 subdirectory named with a UUID being created under there but no files.

 I'm using a custom JavaStreamingContextFactory which creates a
 JavaStreamingContext with the directory set into it via the
 checkpoint(String) method.

 I'm currently not invoking the checkpoint(Duration) method on the DStream
 since I want to first rely on Spark's default checkpointing interval.  My
 streaming batch duration millis is set to 1 second.

 Anyone have any idea what might be going wrong?

 Also, at which point does Spark delete files from checkpointing?

 Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Dmitry Goldenberg
It looks like there's an issue with the 'Parameters' pojo I'm using within
my driver program. For some reason that needs to be serializable, which is
odd.

java.io.NotSerializableException: com.kona.consumer.kafka.spark.Parameters


Giving it another whirl though having to make it serializable seems odd to
me..

On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote:

 If you've set the checkpoint dir, it seems like indeed the intent is
 to use a default checkpoint interval in DStream:

 private[streaming] def initialize(time: Time) {
 ...
   // Set the checkpoint interval to be slideDuration or 10 seconds,
 which ever is larger
   if (mustCheckpoint  checkpointDuration == null) {
 checkpointDuration = slideDuration * math.ceil(Seconds(10) /
 slideDuration).toInt
 logInfo(Checkpoint interval automatically set to  +
 checkpointDuration)
   }

 Do you see that log message? what's the interval? that could at least
 explain why it's not doing anything, if it's quite long.

 It sort of seems wrong though since
 https://spark.apache.org/docs/latest/streaming-programming-guide.html
 suggests it was intended to be a multiple of the batch interval. The
 slide duration wouldn't always be relevant anyway.

 On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
 dgoldenberg...@gmail.com wrote:
  I've instrumented checkpointing per the programming guide and I can tell
  that Spark Streaming is creating the checkpoint directories but I'm not
  seeing any content being created in those directories nor am I seeing the
  effects I'd expect from checkpointing.  I'd expect any data that comes
 into
  Kafka while the consumers are down, to get picked up when the consumers
 are
  restarted; I'm not seeing that.
 
  For now my checkpoint directory is set to the local file system with the
  directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
  subdirectory named with a UUID being created under there but no files.
 
  I'm using a custom JavaStreamingContextFactory which creates a
  JavaStreamingContext with the directory set into it via the
  checkpoint(String) method.
 
  I'm currently not invoking the checkpoint(Duration) method on the DStream
  since I want to first rely on Spark's default checkpointing interval.  My
  streaming batch duration millis is set to 1 second.
 
  Anyone have any idea what might be going wrong?
 
  Also, at which point does Spark delete files from checkpointing?
 
  Thanks.



Re: How to create Spark DataFrame using custom Hadoop InputFormat?

2015-07-31 Thread Umesh Kacha
Hi Ted thanks much for the reply. I cant share code on public forum. I have
created custom format by extending Hadoop mapred InputFormat class and same
way RecordReader class. If you can help me how do I use the same in
DataFrame it would be very helpful.

On Sat, Aug 1, 2015 at 12:12 AM, Ted Yu yuzhih...@gmail.com wrote:

 Can you pastebin the complete stack trace ?

 If you can show skeleton of MyInputFormat and MyRecordWritable, that
 would provide additional information as well.

 Cheers

 On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I am having my own Hadoop custom InputFormat which I need to use in
 creating DataFrame. I tried to do the following

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe =
 sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class);
 myFormatAsDataframe.show();

 Above code does not work and throws exception saying
 java.lang.IllegalArgumentException object is not an instance of declaring
 class

 My custom Hadoop InputFormat works very well with Hive,MapReduce etc How
 do
 I make it work with Spark please guide I am new to Spark. Thank in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Dmitry Goldenberg
I've instrumented checkpointing per the programming guide and I can tell
that Spark Streaming is creating the checkpoint directories but I'm not
seeing any content being created in those directories nor am I seeing the
effects I'd expect from checkpointing.  I'd expect any data that comes into
Kafka while the consumers are down, to get picked up when the consumers are
restarted; I'm not seeing that.

For now my checkpoint directory is set to the local file system with the
directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
subdirectory named with a UUID being created under there but no files.

I'm using a custom JavaStreamingContextFactory which creates a
JavaStreamingContext with the directory set into it via the
checkpoint(String) method.

I'm currently not invoking the checkpoint(Duration) method on the DStream
since I want to first rely on Spark's default checkpointing interval.  My
streaming batch duration millis is set to 1 second.

Anyone have any idea what might be going wrong?

Also, at which point does Spark delete files from checkpointing?

Thanks.


RE: How to register array class with Kyro in spark-defaults.conf

2015-07-31 Thread Wang, Ningjun (LNG-NPV)
Here is the definition of EsDoc

case class EsDoc(id: Long, isExample: Boolean, docSetIds: Array[String], 
randomId: Double, vector: String) extends Serializable

Note that it is not EsDoc having problem with registration. It is the EsDoc[]  
(the array class of EsDoc) that has problem with registration.

I have tried to replace the class EsDoc by the Map class, I also got the 
following error ask me to register the Map[] (array of Map) class

java.lang.IllegalArgumentException: Class is not registered: 
scala.collection.immutable.Map[]
Note: To register this class use: 
kryo.register(scala.collection.immutable.Map[].class);

So the question is how to register Array class? Adding the following in 
spark-defauls.conf does not work

spark.kryo.classesToRegister  
scala.collection.immutable.Map,scala.collection.immutable.Map[]

Ningjun

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Friday, July 31, 2015 11:49 AM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: How to register array class with Kyro in spark-defaults.conf

For the second exception, was there anything following SparkException which 
would give us more clue ?

Can you tell us how EsDoc is structured ?

Thanks

On Fri, Jul 31, 2015 at 8:42 AM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote:
Does anybody have any idea how to solve this problem?

Ningjun

From: Wang, Ningjun (LNG-NPV)
Sent: Thursday, July 30, 2015 11:06 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to register array class with Kyro in spark-defaults.conf

I register my class with Kyro in spark-defaults.conf as follow

spark.serializer   
org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired true
spark.kryo.classesToRegister  ltn.analytics.es.EsDoc

But I got the following exception

java.lang.IllegalArgumentException: Class is not registered: 
ltn.analytics.es.EsDoc[]
Note: To register this class use: kryo.register(ltn.analytics.es.EsDoc[].class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
at 
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:162)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


The error message seems to suggest that I should also register the array class 
EsDoc[]. So I add it to spark-defaults.conf as follow

spark.kryo.classesToRegister  ltn.analytics.es.EsDoc,ltn.analytics.es.EsDoc[]

Then I got the following error

org.apache.spark.SparkException: Failed to register classes with Kryo
at 
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101)
at 
org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:153)
at 
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:115)
at 
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
at 
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at ltn.analytics.index.Index.addDocuments(Index.scala:82)

Please advise.

Thanks.
Ningjun



Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Dmitry Goldenberg
I'll check the log info message..

Meanwhile, the code is basically

public class KafkaSparkStreamingDriver implements Serializable {

..

SparkConf sparkConf = createSparkConf(appName, kahunaEnv);

JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);


jssc.start();

jssc.awaitTermination();

jssc.close();

..

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {

JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{

  @Override

  public JavaStreamingContext create() {

return createContext(sparkConf, params);

  }

};

return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);

  }

...

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {

// Create context with the specified batch interval, in milliseconds.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));

// Set the checkpoint directory, if we're checkpointing

if (params.isCheckpointed()) {

  jssc.checkpoint(params.getCheckpointDir());

}


SetString topicsSet = new HashSetString(Arrays.asList(params
.getTopic()));


// Set the Kafka parameters.

MapString, String kafkaParams = new HashMapString, String();

kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
.getBrokerList());

if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {

  kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
.getAutoOffsetReset());

}


// Create direct Kafka stream with the brokers and the topic.

JavaPairInputDStreamString, String messages =
KafkaUtils.createDirectStream(

  jssc,

  String.class,

  String.class,

  StringDecoder.class,

  StringDecoder.class,

  kafkaParams,

  topicsSet);

// See if there's an override of the default checkpoint duration.

if (params.isCheckpointed()  params.getCheckpointMillis()  0L) {

  messages.checkpoint(Durations.milliseconds(params
.getCheckpointMillis()));

}

.




On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote:

 If you've set the checkpoint dir, it seems like indeed the intent is
 to use a default checkpoint interval in DStream:

 private[streaming] def initialize(time: Time) {
 ...
   // Set the checkpoint interval to be slideDuration or 10 seconds,
 which ever is larger
   if (mustCheckpoint  checkpointDuration == null) {
 checkpointDuration = slideDuration * math.ceil(Seconds(10) /
 slideDuration).toInt
 logInfo(Checkpoint interval automatically set to  +
 checkpointDuration)
   }

 Do you see that log message? what's the interval? that could at least
 explain why it's not doing anything, if it's quite long.

 It sort of seems wrong though since
 https://spark.apache.org/docs/latest/streaming-programming-guide.html
 suggests it was intended to be a multiple of the batch interval. The
 slide duration wouldn't always be relevant anyway.

 On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
 dgoldenberg...@gmail.com wrote:
  I've instrumented checkpointing per the programming guide and I can tell
  that Spark Streaming is creating the checkpoint directories but I'm not
  seeing any content being created in those directories nor am I seeing the
  effects I'd expect from checkpointing.  I'd expect any data that comes
 into
  Kafka while the consumers are down, to get picked up when the consumers
 are
  restarted; I'm not seeing that.
 
  For now my checkpoint directory is set to the local file system with the
  directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
  subdirectory named with a UUID being created under there but no files.
 
  I'm using a custom JavaStreamingContextFactory which creates a
  JavaStreamingContext with the directory set into it via the
  checkpoint(String) method.
 
  I'm currently not invoking the checkpoint(Duration) method on the DStream
  since I want to first rely on Spark's default checkpointing interval.  My
  streaming batch duration millis is set to 1 second.
 
  Anyone have any idea what might be going wrong?
 
  Also, at which point does Spark delete files from checkpointing?
 
  Thanks.



How to create Spark DataFrame using custom Hadoop InputFormat?

2015-07-31 Thread unk1102
Hi I am having my own Hadoop custom InputFormat which I need to use in
creating DataFrame. I tried to do the following

JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =
jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
DataFrame myFormatAsDataframe =
sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class);
myFormatAsDataframe.show();

Above code does not work and throws exception saying
java.lang.IllegalArgumentException object is not an instance of declaring
class

My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do
I make it work with Spark please guide I am new to Spark. Thank in advance.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to add multiple sequence files from HDFS to a Spark Context to do Batch processing?

2015-07-31 Thread swetha
Hi,

How to add multiple sequence files from HDFS to a Spark Context to do Batch
processing? I have something like the following in my code. Do I have to add
Comma separated list of Sequence file paths to the Spark Context.

 val data  = if(args.length0  args(0)!=null)
  sc.sequenceFile(file,  classOf[LongWritable], classOf[Text]).
map{case (x, y) = (x.toString, y.toString)}

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-multiple-sequence-files-from-HDFS-to-a-Spark-Context-to-do-Batch-processing-tp24102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Cody Koeninger
Show us the relevant code

On Fri, Jul 31, 2015 at 12:16 PM, Dmitry Goldenberg 
dgoldenberg...@gmail.com wrote:

 I've instrumented checkpointing per the programming guide and I can tell
 that Spark Streaming is creating the checkpoint directories but I'm not
 seeing any content being created in those directories nor am I seeing the
 effects I'd expect from checkpointing.  I'd expect any data that comes into
 Kafka while the consumers are down, to get picked up when the consumers are
 restarted; I'm not seeing that.

 For now my checkpoint directory is set to the local file system with the
 directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
 subdirectory named with a UUID being created under there but no files.

 I'm using a custom JavaStreamingContextFactory which creates a
 JavaStreamingContext with the directory set into it via the
 checkpoint(String) method.

 I'm currently not invoking the checkpoint(Duration) method on the DStream
 since I want to first rely on Spark's default checkpointing interval.  My
 streaming batch duration millis is set to 1 second.

 Anyone have any idea what might be going wrong?

 Also, at which point does Spark delete files from checkpointing?

 Thanks.



Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-31 Thread Martin Senne
Dear Michael, dear all,

a minimal example is listed below.

After some further analysis I could figure out, that the problem is related
to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of
the left and right dataframes when doing the select on the joined table.

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName)).map(cn = rightDF(cn))

leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), leftouter)
  .select(leftColumns ++ rightColumns: _*)
  }

As the column y of the right table has nullable=false, this is then
also transferred to the joined-Table y-Column, as I use rightDF( y
).

Thus, I need to use columns of the joined table for the select.

*Question now: The joined table has column names x, a, x, y.
How do I discard the second x column?*

All my approaches failed (assuming here, that joinedDF is the joined DataFrame.


   - Using joinedDFdrop( x ) discards both x columns.
   - Using joinedDF(x) does not work as it is ambigious
   - Also using rightDF.as( aliasname)  in order to differentiate the
   column x (from left DataFrame) with x (from right DataFrame) did not
   work out, as I found no way as use select( $aliasname.x) really
   programmatically. Could someone sketch the code?

Any help welcome, thanks


Martin




import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SQLContext}

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, hello), Record(2, bob))
  val mappings = Seq( Mapping(2, 5) )
}

object MinimalShowcase {

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName)).map(cn = rightDF(cn))

leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), leftouter)
  .select(leftColumns ++ rightColumns: _*)
  }


  /**
   * Set, if a column is nullable.
   * @param df source DataFrame
   * @param cn is the column name to change
   * @param nullable is the flag to set, such that the column is
either nullable or not
   */
  def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
Boolean) : DataFrame = {

val schema = df.schema
val newSchema = StructType(schema.map {
  case StructField( c, t, _, m) if c.equals(cn) = StructField( c,
t, nullable = nullable, m)
  case y: StructField = y
})
df.sqlContext.createDataFrame( df.rdd, newSchema)
  }


  def main (args: Array[String]) {
val conf = new SparkConf()
  .setAppName(Minimal)
  .setMaster(local[*])

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
val mappingWithNullDF = setNullableStateOfColumn(mappingDF, y, true)

val joinedDF = recordDF.join(mappingDF, recordDF(x) ===
mappingDF(x), leftouter)
println(joinedDF:)
joinedDF.show
joinedDF.printSchema
joinedDF.filter(joinedDF(y).isNotNull).show

//joinedDF:
//+-+-+++
//|x|a|   x|   y|
//+-+-+++
//|1|hello|null|null|
//|2|  bob|   2|   5|
//+-+-+++
//
//root
//|-- x: integer (nullable = false)
//|-- a: string (nullable = true)
//|-- x: integer (nullable = true)
//|-- y: integer (nullable = true)
//
//+-+---+-+-+
//|x|  a|x|y|
//+-+---+-+-+
//|2|bob|2|5|
//+-+---+-+-+


val extrajoinedDF =
leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, x)
println(extrajoinedDF:)
extrajoinedDF.show
extrajoinedDF.printSchema
extrajoinedDF.filter(extrajoinedDF(y).isNotNull).show

//extrajoinedDF:
//+-+-++
//|x|a|   y|
//+-+-++
//|1|hello|null|
//|2|  bob|   5|
//+-+-++
//
//root
//|-- x: integer (nullable = false)
//|-- a: string (nullable = true)
//|-- y: integer (nullable = false)
//
//+-+-++
//|x|a|   y|
//+-+-++
//|1|hello|null|
//|2|  bob|   5|
//+-+-++



val joined2DF = recordDF.join(mappingWithNullDF, recordDF(x) ===
mappingWithNullDF(x), leftouter)
println(joined2DF:)

How to increase parallelism of a Spark cluster?

2015-07-31 Thread Sujit Pal
Hello,

I am trying to run a Spark job that hits an external webservice to get back
some information. The cluster is 1 master + 4 workers, each worker has 60GB
RAM and 4 CPUs. The external webservice is a standalone Solr server, and is
accessed using code similar to that shown below.

def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10)

 .mapPartitions(keyValues = getResults(keyValues))


The mapPartitions does some initialization to the SolrJ client per
partition and then hits it for each record in the partition via the
getResults() call.

I repartitioned in the hope that this will result in 10 clients hitting
Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
clients if I can). However, I counted the number of open connections using
netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
observed that Solr has a constant 4 clients (ie, equal to the number of
workers) over the lifetime of the run.

My observation leads me to believe that each worker processes a single
stream of work sequentially. However, from what I understand about how
Spark works, each worker should be able to process number of tasks
parallelly, and that repartition() is a hint for it to do so.

Is there some SparkConf environment variable I should set to increase
parallelism in these workers, or should I just configure a cluster with
multiple workers per machine? Or is there something I am doing wrong?

Thank you in advance for any pointers you can provide.

-sujit


Re: How to create Spark DataFrame using custom Hadoop InputFormat?

2015-07-31 Thread Ted Yu
I don't think using Void class is the right choice - it is not even a
Writable.

BTW in the future, capture text output instead of image.

Thanks

On Fri, Jul 31, 2015 at 12:35 PM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi Ted thanks My key is always Void because my custom format file is non
 splittable so key is Void and values is  MyRecordWritable which extends
 Hadoop Writable. I am sharing my log as snap please dont mind as I cant
 paste code outside.

 Regards,
 Umesh

 On Sat, Aug 1, 2015 at 12:59 AM, Ted Yu yuzhih...@gmail.com wrote:

 Looking closer at the code you posted, the error likely was caused by the
 3rd parameter: Void.class

 It is supposed to be the class of key.

 FYI

 On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I am having my own Hadoop custom InputFormat which I need to use in
 creating DataFrame. I tried to do the following

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe =
 sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class);
 myFormatAsDataframe.show();

 Above code does not work and throws exception saying
 java.lang.IllegalArgumentException object is not an instance of declaring
 class

 My custom Hadoop InputFormat works very well with Hive,MapReduce etc How
 do
 I make it work with Spark please guide I am new to Spark. Thank in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






What happens when you create more DStreams then nodes in the cluster?

2015-07-31 Thread Brandon White
Since one input dstream creates one receiver and one receiver uses one
executor / node.

What happens if you create more Dstreams than nodes in the cluster?

Say I have 30 Dstreams on a 15 node cluster.

Will ~10 streams get assigned to ~10 executors / nodes then the other ~20
streams will be queued for resources or will the other streams just fail
and never run?


Re: How to create Spark DataFrame using custom Hadoop InputFormat?

2015-07-31 Thread Ted Yu
Looking closer at the code you posted, the error likely was caused by the
3rd parameter: Void.class

It is supposed to be the class of key.

FYI

On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I am having my own Hadoop custom InputFormat which I need to use in
 creating DataFrame. I tried to do the following

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe =
 sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class);
 myFormatAsDataframe.show();

 Above code does not work and throws exception saying
 java.lang.IllegalArgumentException object is not an instance of declaring
 class

 My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do
 I make it work with Spark please guide I am new to Spark. Thank in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-31 Thread Brandon White
Tathagata,

Could the bottleneck possibility be the number of executor nodes in our
cluster? Since we are creating 500 Dstreams based off 500 textfile
directories, do we need at least 500 executors / nodes to be receivers for
each one of the streams?

On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com wrote:

 @Ashwin: You could append the topic in the data.

 val kafkaStreams = topics.map { topic =
 KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
 }
 val unionedStream = context.union(kafkaStreams)


 @Brandon:
 I dont recommend it, but you could do something crazy like use the
 foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
 waits for all the jobs to complete.

 manyDStreams.foreach { dstream =
dstream1.foreachRDD { rdd =
 // Add runnable that runs the job on RDD to threadpool
 // This does not wait for the job to finish
   }
 }

 anyOfTheManyDStreams.foreachRDD { _ =
 // wait for all the current batch's jobs in the threadpool to
 complete.

 }


 This would run all the Spark jobs in the batch in parallel in thread pool,
 but it would also make sure all the jobs finish before the batch is marked
 as completed.

 On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Thank you Tathagata. My main use case for the 500 streams is to append
 new elements into their corresponding Spark SQL tables. Every stream is
 mapped to a table so I'd like to use the streams to appended the new rdds
 to the table. If I union all the streams, appending new elements becomes a
 nightmare. So there is no other way to parallelize something like the
 following? Will this still run sequence or timeout?

 //500 streams
 streams.foreach { stream =
   stream.foreachRDD { rdd =
 val df = sqlContext.jsonRDD(rdd)
 df.saveAsTable(streamTuple._1, SaveMode.Append)

   }
 }

 On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com
 wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?







how to convert a sequence of TimeStamp to a dataframe

2015-07-31 Thread Joanne Contact
Hi Guys,

I have struggled for a while on this seeming simple thing:

I have a sequence of timestamps and want to create a dataframe with 1 column.

Seq[java.sql.Timestamp]

//import collection.breakOut

var seqTimestamp = scala.collection.Seq(listTs:_*)

seqTimestamp: Seq[java.sql.Timestamp] = List(2015-07-22 16:52:00.0,
2015-07-22 16:53:00.0, ., )

I tried a lot of ways to create a dataframe and below is another failed way:

import sqlContext.implicits._
var rddTs = sc.parallelize(seqTimestamp)
rddTs.toDF(minInterval)

console:108: error: value toDF is not a member of
org.apache.spark.rdd.RDD[java.sql.Timestamp] rddTs.toDF(minInterval)

So, any guru could please tell me how to do this

I am not familiar with Scala or Spark. I wonder if learning Scala will
help this at all? It just sounds a lot of time of trial/error and
googling.

docs like
https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/DataFrame.html
https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/SQLContext.html#createDataFrame(scala.collection.Seq,
scala.reflect.api.TypeTags.TypeTag)
does not help.

Btw, I am using Spark 1.4.

Thanks in advance,

J

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Setting a stage timeout

2015-07-31 Thread Ted Yu
The referenced bug has been fixed in 1.4.0, are you able to upgrade ?

Cheers

On Fri, Jul 31, 2015 at 10:01 AM, William Kinney william.kin...@gmail.com
wrote:

 Hi,

 I had a job that got stuck on yarn due to
 https://issues.apache.org/jira/browse/SPARK-6954
 It never exited properly.

 Is there a way to set a timeout for a stage or all stages?



Re: SparkLauncher not notified about finished job - hangs infinitely.

2015-07-31 Thread Ted Yu
Tomasz:
Please take a look at the Redirector class inside:
./launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java

FYI

On Fri, Jul 31, 2015 at 10:02 AM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 Hi Tomasz,

 *Answer to your 1st question*:

 Clear/read the error (spark.getErrorStream()) and output
 (spark.getInputStream()) stream buffers before you call spark.waitFor(), it
 would be better to clear/read them with 2 different threads. Then it should
 work fine.

 As Spark job is launched as subprocess, and according to Oracle
 documentation
 https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html:

 By default, the created subprocess does not have its own terminal or
 console. All its standard I/O (i.e. stdin, stdout, stderr) operations will
 be redirected to the parent process, where they can be accessed via the
 streams obtained using the methodsgetOutputStream(), getInputStream(), and
 getErrorStream(). The parent process uses these streams to feed input to
 and get output from the subprocess. Because some native platforms only
 provide limited buffer size for standard input and output streams, failure
 to promptly write the input stream or read the output stream of the
 subprocess may cause the subprocess to block, or even deadlock.
 



 On Fri, Jul 31, 2015 at 2:45 AM, Tomasz Guziałek 
 tomasz.guzia...@humaninference.com wrote:

 I am trying to submit a JAR with Spark job into the YARN cluster from
 Java code. I am using SparkLauncher to submit SparkPi example:

 Process spark = new SparkLauncher()

 .setAppResource(C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar)
 .setMainClass(org.apache.spark.examples.SparkPi)
 .setMaster(yarn-cluster)
 .launch();
 System.out.println(Waiting for finish...);
 int exitCode = spark.waitFor();
 System.out.println(Finished! Exit code: + exitCode);

 There are two problems:

 1. While submitting in yarn-cluster mode, the application is
 successfully submitted to YARN and executes successfully (it is visible in
 the YARN UI, reported as SUCCESS and PI value is printed in the output).
 However, the submitting application is never notified that processing is
 finished - it hangs infinitely after printing Waiting to finish... The
 log of the container can be found here: http://pastebin.com/LscBjHQc
 2. While submitting in yarn-client mode, the application does not
 appear in YARN UI and the submitting application hangs at Waiting to
 finish... When hanging code is killed, the application shows up in YARN UI
 and it is reported as SUCCESS, but the output is empty (PI value is not
 printed out). The log of the container can be found here:
 http://pastebin.com/9KHi81r4

 I tried to execute the submitting application both with Oracle Java 8 and
 7.



 Any hints what might be wrong?



 Best regards,

 Tomasz




 --

 Best regards,
 Elkhan Dadashov



Record Linkage in Spark

2015-07-31 Thread dihash
Hi Folks

I would like to do RL in spark, as legacy approach using blocking query is
not working as data is getting huge. Please provide necessary links and
information for doing so.

Thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Record-Linkage-in-Spark-tp24100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What happens when you create more DStreams then nodes in the cluster?

2015-07-31 Thread Ashwin Giridharan
@Brandon Each node can host multiple executors. For example, In a 15 node
cluster, if your NodeManager ( In YARN) or equivalent ( MESOS or
Standalone), runs on each of this node and if the node has enough resources
to host say 5 executors, then in total you can have 15*5 executors and each
of this executor can have a DStream Receiver.

But be aware that each of the DStream receiver uses a dedicated core. the
number of cores allocated to the Spark Streaming application must be more
than the number of receivers. Otherwise the system will receive data, but
not be able to process it

Thanks,
Ashwin

On Fri, Jul 31, 2015 at 4:52 PM, Brandon White bwwintheho...@gmail.com
wrote:

 Since one input dstream creates one receiver and one receiver uses one
 executor / node.

 What happens if you create more Dstreams than nodes in the cluster?

 Say I have 30 Dstreams on a 15 node cluster.

 Will ~10 streams get assigned to ~10 executors / nodes then the other ~20
 streams will be queued for resources or will the other streams just fail
 and never run?




-- 
Thanks  Regards,
Ashwin Giridharan


Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-31 Thread Ashwin Giridharan
Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I
can get the topics from the offset!!

On Fri, Jul 31, 2015 at 4:41 PM, Brandon White bwwintheho...@gmail.com
wrote:

 Tathagata,

 Could the bottleneck possibility be the number of executor nodes in our
 cluster? Since we are creating 500 Dstreams based off 500 textfile
 directories, do we need at least 500 executors / nodes to be receivers for
 each one of the streams?

 On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com
 wrote:

 @Ashwin: You could append the topic in the data.

 val kafkaStreams = topics.map { topic =
 KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
 }
 val unionedStream = context.union(kafkaStreams)


 @Brandon:
 I dont recommend it, but you could do something crazy like use the
 foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
 waits for all the jobs to complete.

 manyDStreams.foreach { dstream =
dstream1.foreachRDD { rdd =
 // Add runnable that runs the job on RDD to threadpool
 // This does not wait for the job to finish
   }
 }

 anyOfTheManyDStreams.foreachRDD { _ =
 // wait for all the current batch's jobs in the threadpool to
 complete.

 }


 This would run all the Spark jobs in the batch in parallel in thread
 pool, but it would also make sure all the jobs finish before the batch is
 marked as completed.

 On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Thank you Tathagata. My main use case for the 500 streams is to append
 new elements into their corresponding Spark SQL tables. Every stream is
 mapped to a table so I'd like to use the streams to appended the new rdds
 to the table. If I union all the streams, appending new elements becomes a
 nightmare. So there is no other way to parallelize something like the
 following? Will this still run sequence or timeout?

 //500 streams
 streams.foreach { stream =
   stream.foreachRDD { rdd =
 val df = sqlContext.jsonRDD(rdd)
 df.saveAsTable(streamTuple._1, SaveMode.Append)

   }
 }

 On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com
 wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com
  wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?








-- 
Thanks  Regards,
Ashwin Giridharan


Re: Problems with JobScheduler

2015-07-31 Thread Guillermo Ortiz
It doesn't make sense to me. Because in the another cluster process all
data in less than a second.
Anyway, I'm going to set that parameter.

2015-07-31 0:36 GMT+02:00 Tathagata Das t...@databricks.com:

 Yes, and that is indeed the problem. It is trying to process all the data
 in Kafka, and therefore taking 60 seconds. You need to set the rate limits
 for that.

 On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger c...@koeninger.org
 wrote:

 If you don't set it, there is no maximum rate, it will get everything
 from the end of the last batch to the maximum available offset

 On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 The difference is that one recives more data than the others two. I can
 pass thought parameters the topics, so, I could execute the code trying
 with one topic and figure out with one is the topic, although I guess that
 it's the topics which gets more data.

 Anyway it's pretty weird those delays in just one of the cluster even if
 the another one is not running.
 I have seen the parameter spark.streaming.kafka.maxRatePerPartition,
 I haven't set any value for this parameter, how does it work if this
 parameter doesn't have a value?

 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org:

 If the jobs are running on different topicpartitions, what's different
 about them?  Is one of them 120x the throughput of the other, for
 instance?  You should be able to eliminate cluster config as a difference
 by running the same topic partition on the different clusters and comparing
 the results.

 On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 I have three topics with one partition each topic. So each jobs run
 about one topics.

 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is
 this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz 
 konstt2...@gmail.com wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain why 
 it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents.
 I tried to stop one cluster and execute just the cluster isn't working 
 but
 it happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same
 code in two cluster. I read from three topics in Kafka with 
 DirectStream so
 I have three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0
 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
 24.0 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
 24.0 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
 1438259625000 ms*
 *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time
 143825963 ms*
 *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time
 1438259635000 ms*
 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage
 24.0 (TID 73) in 60373 ms on (3/3)
 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose
 tasks have all completed, 

Spark-Submit error

2015-07-31 Thread satish chandra j
HI,
I have submitted a Spark Job with options jars,class,master as *local* but
i am getting an error as below

*dse spark-submit spark error exception in thread main java.io.ioexception:
Invalid Request Exception(Why you have not logged in)*

*Note: submitting datastax spark node*

please let me know if anybody have a solutions for this issue



Regards,
Saish Chandra


Re: Has anybody ever tried running Spark Streaming on 500 text streams?

2015-07-31 Thread Tathagata Das
@Brandon, the file streams do not use receivers, so the bottleneck is not
about executors per se. But there could be couple of bottlenecks
1. Every batch interval, the 500 dstreams are going to get directory
listing from 500 directories, SEQUENTIALLY. So preparing the batch's RDDs
and jobs can take a time. So your batch interval cant be small, may have to
be 10s of seconds. Which is probably fine for your application, otherwise
you would not be using files in the first place.
2. Processing new files from 500 directories may take significant
computation power. Just make sure you get large enough cluster.

On Fri, Jul 31, 2015 at 2:40 PM, Ashwin Giridharan ashwin.fo...@gmail.com
wrote:

 Thanks a lot @Das @Cody. I moved from receiver based to direct stream and
 I can get the topics from the offset!!

 On Fri, Jul 31, 2015 at 4:41 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Tathagata,

 Could the bottleneck possibility be the number of executor nodes in our
 cluster? Since we are creating 500 Dstreams based off 500 textfile
 directories, do we need at least 500 executors / nodes to be receivers for
 each one of the streams?

 On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com
 wrote:

 @Ashwin: You could append the topic in the data.

 val kafkaStreams = topics.map { topic =
 KafkaUtils.createDirectStream(topic...).map { x = (x, topic) }
 }
 val unionedStream = context.union(kafkaStreams)


 @Brandon:
 I dont recommend it, but you could do something crazy like use the
 foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD
 waits for all the jobs to complete.

 manyDStreams.foreach { dstream =
dstream1.foreachRDD { rdd =
 // Add runnable that runs the job on RDD to threadpool
 // This does not wait for the job to finish
   }
 }

 anyOfTheManyDStreams.foreachRDD { _ =
 // wait for all the current batch's jobs in the threadpool to
 complete.

 }


 This would run all the Spark jobs in the batch in parallel in thread
 pool, but it would also make sure all the jobs finish before the batch is
 marked as completed.

 On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Thank you Tathagata. My main use case for the 500 streams is to append
 new elements into their corresponding Spark SQL tables. Every stream is
 mapped to a table so I'd like to use the streams to appended the new rdds
 to the table. If I union all the streams, appending new elements becomes a
 nightmare. So there is no other way to parallelize something like the
 following? Will this still run sequence or timeout?

 //500 streams
 streams.foreach { stream =
   stream.foreachRDD { rdd =
 val df = sqlContext.jsonRDD(rdd)
 df.saveAsTable(streamTuple._1, SaveMode.Append)

   }
 }

 On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com
 wrote:

 I dont think any one has really run 500 text streams.
 And parSequences do nothing out there, you are only parallelizing the
 setup code which does not really compute anything. Also it setsup 500
 foreachRDD operations that will get executed in each batch sequentially, 
 so
 does not make sense. The write way to parallelize this is union all the
 streams.

 val streams = streamPaths.map { path =
   ssc.textFileStream(path)
 }
 val unionedStream = streamingContext.union(streams)
 unionedStream.foreachRDD { rdd =
   // do something
 }

 Then there is only one foreachRDD executed in every batch that will
 process in parallel all the new files in each batch interval.
 TD


 On Tue, Jul 28, 2015 at 3:06 PM, Brandon White 
 bwwintheho...@gmail.com wrote:

 val ssc = new StreamingContext(sc, Minutes(10))

 //500 textFile streams watching S3 directories
 val streams = streamPaths.par.map { path =
   ssc.textFileStream(path)
 }

 streams.par.foreach { stream =
   stream.foreachRDD { rdd =
 //do something
   }
 }

 ssc.start()

 Would something like this scale? What would be the limiting factor to
 performance? What is the best way to parallelize this? Any other ideas on
 design?








 --
 Thanks  Regards,
 Ashwin Giridharan



Re: how to convert a sequence of TimeStamp to a dataframe

2015-07-31 Thread Ted Yu
Please take a look at stringToTimestamp() in
./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Representing timestamp with long should work.

Cheers

On Fri, Jul 31, 2015 at 2:50 PM, Joanne Contact joannenetw...@gmail.com
wrote:

 Hi Guys,

 I have struggled for a while on this seeming simple thing:

 I have a sequence of timestamps and want to create a dataframe with 1
 column.

 Seq[java.sql.Timestamp]

 //import collection.breakOut

 var seqTimestamp = scala.collection.Seq(listTs:_*)

 seqTimestamp: Seq[java.sql.Timestamp] = List(2015-07-22 16:52:00.0,
 2015-07-22 16:53:00.0, ., )

 I tried a lot of ways to create a dataframe and below is another failed
 way:

 import sqlContext.implicits._
 var rddTs = sc.parallelize(seqTimestamp)
 rddTs.toDF(minInterval)

 console:108: error: value toDF is not a member of
 org.apache.spark.rdd.RDD[java.sql.Timestamp] rddTs.toDF(minInterval)

 So, any guru could please tell me how to do this

 I am not familiar with Scala or Spark. I wonder if learning Scala will
 help this at all? It just sounds a lot of time of trial/error and
 googling.

 docs like

 https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/DataFrame.html

 https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/SQLContext.html#createDataFrame(scala.collection.Seq
 ,
 scala.reflect.api.TypeTags.TypeTag)
 does not help.

 Btw, I am using Spark 1.4.

 Thanks in advance,

 J

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Encryption on RDDs or in-memory/cache on Apache Spark

2015-07-31 Thread Matthew O'Reilly
Hi, 

I am currently working on the latest version of Apache Spark (1.4.1), pre-built 
package for Hadoop 2.6+.

Is there any feature in Spark/Hadoop to encrypt RDDs or in-memory/cache 
(something similar is Altibase's HDB: 
http://altibase.com/in-memory-database-computing-solutions/security/) when 
running applications in Spark? Or is there an external library/framework which 
could be used to encrypt RDDs or in-memory/cache in Spark?

I discovered it is possible to encrypt the data, and encapsulate it into RDD. 
However, I feel this affects Spark's fast data processing as it is slower to 
encrypt the data, and then encapsulate it to RDD; it's then a two step process. 
Encryption and storing data should be done parallel.

Any help would be appreciated.

Many thanks,
Matthew


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problems with JobScheduler

2015-07-31 Thread Guillermo Ortiz
I detected the error. The final step is to index data in ElasticSearch, The
elasticSearch in one of the cluster is overhelmed and it doesn't work
correctly.
I linked the cluster which doesn't work with another ES and don't get any
delay.

Sorry,  it wasn't relationed with Spark!




2015-07-31 9:15 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 It doesn't make sense to me. Because in the another cluster process all
 data in less than a second.
 Anyway, I'm going to set that parameter.

 2015-07-31 0:36 GMT+02:00 Tathagata Das t...@databricks.com:

 Yes, and that is indeed the problem. It is trying to process all the data
 in Kafka, and therefore taking 60 seconds. You need to set the rate limits
 for that.

 On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger c...@koeninger.org
 wrote:

 If you don't set it, there is no maximum rate, it will get everything
 from the end of the last batch to the maximum available offset

 On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz konstt2...@gmail.com
 wrote:

 The difference is that one recives more data than the others two. I can
 pass thought parameters the topics, so, I could execute the code trying
 with one topic and figure out with one is the topic, although I guess that
 it's the topics which gets more data.

 Anyway it's pretty weird those delays in just one of the cluster even
 if the another one is not running.
 I have seen the parameter spark.streaming.kafka.maxRatePerPartition,
 I haven't set any value for this parameter, how does it work if this
 parameter doesn't have a value?

 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org:

 If the jobs are running on different topicpartitions, what's different
 about them?  Is one of them 120x the throughput of the other, for
 instance?  You should be able to eliminate cluster config as a difference
 by running the same topic partition on the different clusters and 
 comparing
 the results.

 On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com
  wrote:

 I have three topics with one partition each topic. So each jobs run
 about one topics.

 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org:

 Just so I'm clear, the difference in timing you're talking about is
 this:

 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at
 MetricsSpark.scala:67, took 60.391761 s

 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at
 MetricsSpark.scala:67, took 0.531323 s


 Are those jobs running on the same topicpartition?


 On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz 
 konstt2...@gmail.com wrote:

 I read about maxRatePerPartition parameter, I haven't set this
 parameter. Could it be the problem?? Although this wouldn't explain 
 why it
 doesn't work in one of the clusters.

 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 They just share the kafka, the rest of resources are independents.
 I tried to stop one cluster and execute just the cluster isn't 
 working but
 it happens the same.

 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 I have some problem with the JobScheduler. I have executed same
 code in two cluster. I read from three topics in Kafka with 
 DirectStream so
 I have three tasks.

 I have check YARN and there aren't more jobs launched.

 The cluster where I have troubles I got this logs:

 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage
 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes)
 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage
 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added
 broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, 
 free:
 530.3 MB)
 15/07/30 14:32:58 INFO BlockManagerInfo: Added
 broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, 
 free:
 530.3 MB)
 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage
 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage
 24.0 (TID 72) in 208 ms on x (1/3)
 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage
 24.0 (TID 74) in 49 ms on x (2/3)
 *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time
 143825958 ms*
 *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time
 1438259585000 ms*
 *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time
 143825959 ms*
 *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time
 1438259595000 ms*
 *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time
 143825960 ms*
 *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time
 1438259605000 ms*
 *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time
 143825961 ms*
 *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time
 1438259615000 ms*
 *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time
 143825962 ms*
 *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time
 1438259625000 ms*
 *15/07/30 

Buffer Overflow exception

2015-07-31 Thread vinod kumar
Hi,

I am getting buffer over flow exception while using spark via thrifserver
base.May I know how to overcome this?

Code:

 HqlConnection con = new HqlConnection(localhost, 10001,
HiveServer.HiveServer2);
con.Open();
HqlCommand createCommand = new HqlCommand(tablequery, con); =
Here table query was the query which I used to create a table
createCommand.ExecuteNonQuery();


#.It seems spark works slower when compare to SQLServer.May I know the
reason for that?
My Case is:
I have used the table called TestTable with 4 records in SQLServer
and I executed a query and it returns the result in 1 sec.
Then I have converted the same table as csv and exported it to spark and I
executed the same query like in code but it takes more time almost 2 minute
to return the results.
May I know the reason for this slow process too?

Thanks,
Vinod


looking for helps in using graphx aggregateMessages

2015-07-31 Thread man june
Dear list,
Hi~I am new to spark and graphx, and I have a few experiences using scala. I 
want to use graphx to calculate some basic statistics in linked open data, 
which is basically a graph. 
Suppose the graph only contains one type of edge, directing from individuals to 
concepts, and the edge labels are all type. I want to find all pairs of 
concepts that have at least one individual linking to both of them.The 
following is my current solution, but sadly doesn't work.
Could you please help me work this out? Or are there better solutions? Any 
helps are appreciated! 

  val conf = new 
SparkConf().setMaster(spark://MacBook-Pro:7077).setAppName(My 
App).setJars(...)  val sc = new SparkContext(conf)// initialize individuals 
(in small letters) and concepts (in upper case letters)  val users: 
RDD[(org.apache.spark.graphx.VertexId, String)] =    sc.parallelize(Array((1L, 
a), (2L, b),      (3L, e), (11L, A), (12L, B), (13L, C), (14L, 
D)))  // initialize type edges  val relationships: RDD[Edge[String]] =    
sc.parallelize(Array(      Edge(1L, 11L, type), Edge(1L, 14L, type), 
Edge(1L, 13L, type),      Edge(2L, 11L, type), Edge(2L, 12L, type),      
Edge(3L, 11L, type), Edge(3L, 13L, type)))   val graph = Graph(users, 
relationships)  val indTypes = graph.collectNeighborIds(EdgeDirection.Out)
// seems to be stupid functions...  def mapUDF(triplet: EdgeContext[String, 
String, HashMap[Long, Int]]) = {    val msg = indTypes.filter(pred = pred._1 
== triplet.srcId).first()._2.aggregate(new HashMap[Long, Int])((a, b) = a.+=(b 
- 1), (a, b) = a ++ b)    triplet.sendToDst(msg)  }  def reduceUDF(a: 
HashMap[Long, Int], b: HashMap[Long, Int]): HashMap[Long, Int] = a ++ b.map { 
case (k, v) = k - (v + a.getOrElse(k, 0)) }    var pairs = new HashMap[(Long, 
Long), Int]  val results = graph.aggregateMessages[HashMap[Long, Int]](    
mapUDF, reduceUDF)
  results.foreach(result = {      result._2.filter(p = p._1 != 
result._1).foreach(map = {        val a = result._1        val b = map._1      
  if (!pairs.contains(a, b)  !pairs.contains(b, a))          pairs += (a, b) 
- map._2      })    })  pairs.foreach(println(_))
The exceptions:
TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, 10.42.0.17): 
java.lang.NullPointerExceptionat 
atur.node.learner.Test$.mapUDF(SimpleDisjointnessLearner.scala:147)at 
atur.node.learner.Test$$anonfun$4.apply(SimpleDisjointnessLearner.scala:155)

best wishes,June