Re: How to create Spark DataFrame using custom Hadoop InputFormat?
Hi thanks Void works I use same custom format in Hive and it works with Void as key. Please share example if you have to create DataFrame using custom Hadoop format. On Aug 1, 2015 2:07 AM, Ted Yu yuzhih...@gmail.com wrote: I don't think using Void class is the right choice - it is not even a Writable. BTW in the future, capture text output instead of image. Thanks On Fri, Jul 31, 2015 at 12:35 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Ted thanks My key is always Void because my custom format file is non splittable so key is Void and values is MyRecordWritable which extends Hadoop Writable. I am sharing my log as snap please dont mind as I cant paste code outside. Regards, Umesh On Sat, Aug 1, 2015 at 12:59 AM, Ted Yu yuzhih...@gmail.com wrote: Looking closer at the code you posted, the error likely was caused by the 3rd parameter: Void.class It is supposed to be the class of key. FYI On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I am having my own Hadoop custom InputFormat which I need to use in creating DataFrame. I tried to do the following JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class); myFormatAsDataframe.show(); Above code does not work and throws exception saying java.lang.IllegalArgumentException object is not an instance of declaring class My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do I make it work with Spark please guide I am new to Spark. Thank in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RandomForest in Pyspark (version 1.4.1)
Hi, I tried to develop a RandomForest model for my data in PySpark as follows: rf_model = RandomForest.trainClassifier(train_idf, 2, {}, numTrees=15, seed=144) print RF: Num trees = %d, Num nodes = %d\n %(rf_model.numTrees(), rf_model.totalNumNodes()) pred_label = test_idf.map(lambda p: (float(rf_model.predict(p.features)), p.label)) print pred_label.take(5) ## exception I am getting the following error at the highlighted statement. Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. I have used the same set of statements for linear models (LogisticRegresssion and SVM) in PySpark and was able to get the predictions abd print them. I am not sure why I am getting the above exception. I am not using the SparkContenxt directly in any of the above statements. I would appreciate your help. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RandomForest-in-Pyspark-version-1-4-1-tp24103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Big Integer number in Spark
Do you notice how you are making a List of Int's? input: org.apache.spark.rdd.RDD[*Int*] = ParallelCollectionRDD[0] at parallelize at console:21 And these are also being mapped to more Int's result: org.apache.spark.rdd.RDD[*Int*] = MapPartitionsRDD[1] at map at console:23 Generally, (signed) Integers can range from (-2^31) to (2^31-1), but that mapping makes your new integer bigger than 2^31. So the number will wrap around into the negatives. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Big-Integer-number-in-Spark-tp24095p24096.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[POWERED BY] Please add Typesafe to the list of organizations
Typesafe (http://typesafe.com). We provide commercial support for Spark on Mesos and Mesosphere DCOS. We contribute to Spark's Mesos integration and Spark Streaming enhancements. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Re: setting fs.umask in pyspark
Found an approach: sc._jsc.hadoopConfiguration().set(key, value) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/setting-fs-umask-in-pyspark-tp24070p24098.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with JavaRDD.subtract(JavaRDD) method in local vs. cluster mode
Thanks for the quick reply. I will be unable to collect more data until Monday though, but I will update the thread accordingly. I am using Spark 1.4.0. Were there any related issues reported? I wasn't able to find any, but I may have overlooked something. I have also updated the original question to include the relevant Java files, maybe the issue is hidden there somewhere. Ted Yu yuzhih...@gmail.com schrieb am Fr., 31. Juli 2015 um 18:09 Uhr: Can you call collect() and log the output to get more clue what is left ? Which Spark release are you using ? Cheers On Fri, Jul 31, 2015 at 9:01 AM, Warfish sebastian.ka...@gmail.com wrote: Hi everyone, I work with Spark for a little while now and have encountered a strange problem that gives me headaches, which has to do with the JavaRDD.subtract method. Consider the following piece of code: public static void main(String[] args) { //context is of type JavaSparkContext; FILE is the filepath to my input file JavaRDDString rawTestSet = context.textFile(FILE); JavaRDDString rawTestSet2 = context.textFile(FILE); //Gives 0 everytime - Correct System.out.println(rawTestSetMinusRawTestSet2= + rawTestSet.subtract(rawTestSet2).count()); //SearchData is a custom POJO that holds my data JavaRDDSearchData testSet = convert(rawTestSet); JavaRDDSearchData testSet2= convert(rawTestSet); JavaRDDSearchData testSet3= convert(rawTestSet2); //These calls give numbers !=0 on cluster mode - Incorrect System.out.println(testSetMinuesTestSet2 = + testSet.subtract(testSet2).count()); System.out.println(testSetMinuesTestSet3 = + testSet.subtract(testSet3).count()); System.out.println(testSet2MinuesTestSet3 = + testSet2.subtract(testSet3).count()); } private static JavaRDDSearchData convert(JavaRDDString input) { return input.filter(new Matches(myRegex)) .map(new DoSomething()) .map(new Split(mySplitParam)) .map(new ToMap()) .map(new Clean()) .map(new ToSearchData()); } In this code, I read a file (usually from HDFS, but applies to disk as well) and then convert the Strings into custom objects to hold the data using a chain of filter- and map-operations. These objects are simple POJOs with overriden hashCode() and equal() functions. I then apply the subtract method to several JavaRDDs that contain exact equal data. Note: I have omitted the POJO code and the filter- and map-functions to make the code more concise, but I can post it later if the need arises. In the main method shown above are several calls of the subtract method, all of which should give empty RDDs as results because the data in all RDDs should be exactly the same. This works for Spark in local mode, however when executing the code on a cluster the second block of subtract calls does not result in empty sets, which tells me that it is a more complicated issue. The input data on local and cluster mode was exactly the same. Can someone shed some light on this issue? I feel like I'm overlooking something rather obvious. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-JavaRDD-subtract-JavaRDD-method-in-local-vs-cluster-mode-tp24099.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with JavaRDD.subtract(JavaRDD) method in local vs. cluster mode
Can you call collect() and log the output to get more clue what is left ? Which Spark release are you using ? Cheers On Fri, Jul 31, 2015 at 9:01 AM, Warfish sebastian.ka...@gmail.com wrote: Hi everyone, I work with Spark for a little while now and have encountered a strange problem that gives me headaches, which has to do with the JavaRDD.subtract method. Consider the following piece of code: public static void main(String[] args) { //context is of type JavaSparkContext; FILE is the filepath to my input file JavaRDDString rawTestSet = context.textFile(FILE); JavaRDDString rawTestSet2 = context.textFile(FILE); //Gives 0 everytime - Correct System.out.println(rawTestSetMinusRawTestSet2= + rawTestSet.subtract(rawTestSet2).count()); //SearchData is a custom POJO that holds my data JavaRDDSearchData testSet = convert(rawTestSet); JavaRDDSearchData testSet2= convert(rawTestSet); JavaRDDSearchData testSet3= convert(rawTestSet2); //These calls give numbers !=0 on cluster mode - Incorrect System.out.println(testSetMinuesTestSet2 = + testSet.subtract(testSet2).count()); System.out.println(testSetMinuesTestSet3 = + testSet.subtract(testSet3).count()); System.out.println(testSet2MinuesTestSet3 = + testSet2.subtract(testSet3).count()); } private static JavaRDDSearchData convert(JavaRDDString input) { return input.filter(new Matches(myRegex)) .map(new DoSomething()) .map(new Split(mySplitParam)) .map(new ToMap()) .map(new Clean()) .map(new ToSearchData()); } In this code, I read a file (usually from HDFS, but applies to disk as well) and then convert the Strings into custom objects to hold the data using a chain of filter- and map-operations. These objects are simple POJOs with overriden hashCode() and equal() functions. I then apply the subtract method to several JavaRDDs that contain exact equal data. Note: I have omitted the POJO code and the filter- and map-functions to make the code more concise, but I can post it later if the need arises. In the main method shown above are several calls of the subtract method, all of which should give empty RDDs as results because the data in all RDDs should be exactly the same. This works for Spark in local mode, however when executing the code on a cluster the second block of subtract calls does not result in empty sets, which tells me that it is a more complicated issue. The input data on local and cluster mode was exactly the same. Can someone shed some light on this issue? I feel like I'm overlooking something rather obvious. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-JavaRDD-subtract-JavaRDD-method-in-local-vs-cluster-mode-tp24099.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to register array class with Kyro in spark-defaults.conf
Does anybody have any idea how to solve this problem? Ningjun From: Wang, Ningjun (LNG-NPV) Sent: Thursday, July 30, 2015 11:06 AM To: user@spark.apache.org Subject: How to register array class with Kyro in spark-defaults.conf I register my class with Kyro in spark-defaults.conf as follow spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrationRequired true spark.kryo.classesToRegister ltn.analytics.es.EsDoc But I got the following exception java.lang.IllegalArgumentException: Class is not registered: ltn.analytics.es.EsDoc[] Note: To register this class use: kryo.register(ltn.analytics.es.EsDoc[].class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:162) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) The error message seems to suggest that I should also register the array class EsDoc[]. So I add it to spark-defaults.conf as follow spark.kryo.classesToRegister ltn.analytics.es.EsDoc,ltn.analytics.es.EsDoc[] Then I got the following error org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:153) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:115) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) at ltn.analytics.index.Index.addDocuments(Index.scala:82) Please advise. Thanks. Ningjun
Issues with JavaRDD.subtract(JavaRDD) method in local vs. cluster mode
Hi everyone, I work with Spark for a little while now and have encountered a strange problem that gives me headaches, which has to do with the JavaRDD.subtract method. Consider the following piece of code: public static void main(String[] args) { //context is of type JavaSparkContext; FILE is the filepath to my input file JavaRDDString rawTestSet = context.textFile(FILE); JavaRDDString rawTestSet2 = context.textFile(FILE); //Gives 0 everytime - Correct System.out.println(rawTestSetMinusRawTestSet2= + rawTestSet.subtract(rawTestSet2).count()); //SearchData is a custom POJO that holds my data JavaRDDSearchData testSet = convert(rawTestSet); JavaRDDSearchData testSet2= convert(rawTestSet); JavaRDDSearchData testSet3= convert(rawTestSet2); //These calls give numbers !=0 on cluster mode - Incorrect System.out.println(testSetMinuesTestSet2 = + testSet.subtract(testSet2).count()); System.out.println(testSetMinuesTestSet3 = + testSet.subtract(testSet3).count()); System.out.println(testSet2MinuesTestSet3 = + testSet2.subtract(testSet3).count()); } private static JavaRDDSearchData convert(JavaRDDString input) { return input.filter(new Matches(myRegex)) .map(new DoSomething()) .map(new Split(mySplitParam)) .map(new ToMap()) .map(new Clean()) .map(new ToSearchData()); } In this code, I read a file (usually from HDFS, but applies to disk as well) and then convert the Strings into custom objects to hold the data using a chain of filter- and map-operations. These objects are simple POJOs with overriden hashCode() and equal() functions. I then apply the subtract method to several JavaRDDs that contain exact equal data. Note: I have omitted the POJO code and the filter- and map-functions to make the code more concise, but I can post it later if the need arises. In the main method shown above are several calls of the subtract method, all of which should give empty RDDs as results because the data in all RDDs should be exactly the same. This works for Spark in local mode, however when executing the code on a cluster the second block of subtract calls does not result in empty sets, which tells me that it is a more complicated issue. The input data on local and cluster mode was exactly the same. Can someone shed some light on this issue? I feel like I'm overlooking something rather obvious. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-JavaRDD-subtract-JavaRDD-method-in-local-vs-cluster-mode-tp24099.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Setting a stage timeout
Hi, I had a job that got stuck on yarn due to https://issues.apache.org/jira/browse/SPARK-6954 It never exited properly. Is there a way to set a timeout for a stage or all stages?
Re: SparkLauncher not notified about finished job - hangs infinitely.
Nope, output stream of that subprocess should be spark.getInputStream() According to Oracle Doc https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getOutputStream-- : public abstract InputStream https://docs.oracle.com/javase/8/docs/api/java/io/InputStream.html getInputStream() Returns the input stream connected to the normal output of the subprocess. The stream obtains data piped from the standard output of the process represented by this Process object. On Fri, Jul 31, 2015 at 10:10 AM, Ted Yu yuzhih...@gmail.com wrote: minor typo: bq. output (spark.getInputStream()) Should be spark.getOutputStream() Cheers On Fri, Jul 31, 2015 at 10:02 AM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi Tomasz, *Answer to your 1st question*: Clear/read the error (spark.getErrorStream()) and output (spark.getInputStream()) stream buffers before you call spark.waitFor(), it would be better to clear/read them with 2 different threads. Then it should work fine. As Spark job is launched as subprocess, and according to Oracle documentation https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html: By default, the created subprocess does not have its own terminal or console. All its standard I/O (i.e. stdin, stdout, stderr) operations will be redirected to the parent process, where they can be accessed via the streams obtained using the methodsgetOutputStream(), getInputStream(), and getErrorStream(). The parent process uses these streams to feed input to and get output from the subprocess. Because some native platforms only provide limited buffer size for standard input and output streams, failure to promptly write the input stream or read the output stream of the subprocess may cause the subprocess to block, or even deadlock. On Fri, Jul 31, 2015 at 2:45 AM, Tomasz Guziałek tomasz.guzia...@humaninference.com wrote: I am trying to submit a JAR with Spark job into the YARN cluster from Java code. I am using SparkLauncher to submit SparkPi example: Process spark = new SparkLauncher() .setAppResource(C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar) .setMainClass(org.apache.spark.examples.SparkPi) .setMaster(yarn-cluster) .launch(); System.out.println(Waiting for finish...); int exitCode = spark.waitFor(); System.out.println(Finished! Exit code: + exitCode); There are two problems: 1. While submitting in yarn-cluster mode, the application is successfully submitted to YARN and executes successfully (it is visible in the YARN UI, reported as SUCCESS and PI value is printed in the output). However, the submitting application is never notified that processing is finished - it hangs infinitely after printing Waiting to finish... The log of the container can be found here: http://pastebin.com/LscBjHQc 2. While submitting in yarn-client mode, the application does not appear in YARN UI and the submitting application hangs at Waiting to finish... When hanging code is killed, the application shows up in YARN UI and it is reported as SUCCESS, but the output is empty (PI value is not printed out). The log of the container can be found here: http://pastebin.com/9KHi81r4 I tried to execute the submitting application both with Oracle Java 8 and 7. Any hints what might be wrong? Best regards, Tomasz -- Best regards, Elkhan Dadashov -- Best regards, Elkhan Dadashov
Re: How to control Spark Executors from getting Lost when using YARN client mode?
Hi thanks for the response. It looks like YARN container is getting killed but dont know why I see shuffle metafetchexception as mentioned in the following SO link. I have enough memory 8 nodes 8 cores 30 gig memory each. And because of this metafetchexpcetion YARN killing container running executor how can it over run memory I tried to give each executor 25 gig still it is not sufficient and it fails. Please guide I dont understand what is going on I am using Spark 1.4.0 I am using spark.shuffle.memory as 0.0 and spark.storage.memory as 0.5. I have almost all optimal properties like Kyro serializer I have kept 500 akka frame size 20 akka threads dont know I am trapped its been two days I am trying to recover from this issue. http://stackoverflow.com/questions/29850784/what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept On Thu, Jul 30, 2015 at 9:56 PM, Ashwin Giridharan ashwin.fo...@gmail.com wrote: What is your cluster configuration ( size and resources) ? If you do not have enough resources, then your executor will not run. Moreover allocating 8 cores to an executor is too much. If you have a cluster with four nodes running NodeManagers, each equipped with 4 cores and 8GB of memory, then an optimal configuration would be, --num-executors 8 --executor-cores 2 --executor-memory 2G Thanks, Ashwin On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote: Hi I have one Spark job which runs fine locally with less data but when I schedule it on YARN to execute I keep on getting the following ERROR and slowly all executors gets removed from UI and my job fails 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on myhost1.com: remote Rpc client disassociated 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on myhost2.com: remote Rpc client disassociated I use the following command to schedule spark job in yarn-client mode ./spark-submit --class com.xyz.MySpark --conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M --driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client --executor-memory 2G --executor-cores 8 --num-executors 12 /home/myuser/myspark-1.0.jar I dont know what is the problem please guide. I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Thanks Regards, Ashwin Giridharan
Re: How to add multiple sequence files from HDFS to a Spark Context to do Batch processing?
file can be a directory (look at all children) or even a glob (/path/*.ext, for example). On Fri, Jul 31, 2015 at 11:35 AM, swetha swethakasire...@gmail.com wrote: Hi, How to add multiple sequence files from HDFS to a Spark Context to do Batch processing? I have something like the following in my code. Do I have to add Comma separated list of Sequence file paths to the Spark Context. val data = if(args.length0 args(0)!=null) sc.sequenceFile(file, classOf[LongWritable], classOf[Text]). map{case (x, y) = (x.toString, y.toString)} Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-multiple-sequence-files-from-HDFS-to-a-Spark-Context-to-do-Batch-processing-tp24102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: How to create Spark DataFrame using custom Hadoop InputFormat?
Can you pastebin the complete stack trace ? If you can show skeleton of MyInputFormat and MyRecordWritable, that would provide additional information as well. Cheers On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I am having my own Hadoop custom InputFormat which I need to use in creating DataFrame. I tried to do the following JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class); myFormatAsDataframe.show(); Above code does not work and throws exception saying java.lang.IllegalArgumentException object is not an instance of declaring class My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do I make it work with Spark please guide I am new to Spark. Thank in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkLauncher not notified about finished job - hangs infinitely.
Hi Tomasz, *Answer to your 1st question*: Clear/read the error (spark.getErrorStream()) and output (spark.getInputStream()) stream buffers before you call spark.waitFor(), it would be better to clear/read them with 2 different threads. Then it should work fine. As Spark job is launched as subprocess, and according to Oracle documentation https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html: By default, the created subprocess does not have its own terminal or console. All its standard I/O (i.e. stdin, stdout, stderr) operations will be redirected to the parent process, where they can be accessed via the streams obtained using the methodsgetOutputStream(), getInputStream(), and getErrorStream(). The parent process uses these streams to feed input to and get output from the subprocess. Because some native platforms only provide limited buffer size for standard input and output streams, failure to promptly write the input stream or read the output stream of the subprocess may cause the subprocess to block, or even deadlock. On Fri, Jul 31, 2015 at 2:45 AM, Tomasz Guziałek tomasz.guzia...@humaninference.com wrote: I am trying to submit a JAR with Spark job into the YARN cluster from Java code. I am using SparkLauncher to submit SparkPi example: Process spark = new SparkLauncher() .setAppResource(C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar) .setMainClass(org.apache.spark.examples.SparkPi) .setMaster(yarn-cluster) .launch(); System.out.println(Waiting for finish...); int exitCode = spark.waitFor(); System.out.println(Finished! Exit code: + exitCode); There are two problems: 1. While submitting in yarn-cluster mode, the application is successfully submitted to YARN and executes successfully (it is visible in the YARN UI, reported as SUCCESS and PI value is printed in the output). However, the submitting application is never notified that processing is finished - it hangs infinitely after printing Waiting to finish... The log of the container can be found here: http://pastebin.com/LscBjHQc 2. While submitting in yarn-client mode, the application does not appear in YARN UI and the submitting application hangs at Waiting to finish... When hanging code is killed, the application shows up in YARN UI and it is reported as SUCCESS, but the output is empty (PI value is not printed out). The log of the container can be found here: http://pastebin.com/9KHi81r4 I tried to execute the submitting application both with Oracle Java 8 and 7. Any hints what might be wrong? Best regards, Tomasz -- Best regards, Elkhan Dadashov
Re: Checkpointing doesn't appear to be working for direct streaming from Kafka
If you've set the checkpoint dir, it seems like indeed the intent is to use a default checkpoint interval in DStream: private[streaming] def initialize(time: Time) { ... // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint checkpointDuration == null) { checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt logInfo(Checkpoint interval automatically set to + checkpointDuration) } Do you see that log message? what's the interval? that could at least explain why it's not doing anything, if it's quite long. It sort of seems wrong though since https://spark.apache.org/docs/latest/streaming-programming-guide.html suggests it was intended to be a multiple of the batch interval. The slide duration wouldn't always be relevant anyway. On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I've instrumented checkpointing per the programming guide and I can tell that Spark Streaming is creating the checkpoint directories but I'm not seeing any content being created in those directories nor am I seeing the effects I'd expect from checkpointing. I'd expect any data that comes into Kafka while the consumers are down, to get picked up when the consumers are restarted; I'm not seeing that. For now my checkpoint directory is set to the local file system with the directory URI being in this form: file:///mnt/dir1/dir2. I see a subdirectory named with a UUID being created under there but no files. I'm using a custom JavaStreamingContextFactory which creates a JavaStreamingContext with the directory set into it via the checkpoint(String) method. I'm currently not invoking the checkpoint(Duration) method on the DStream since I want to first rely on Spark's default checkpointing interval. My streaming batch duration millis is set to 1 second. Anyone have any idea what might be going wrong? Also, at which point does Spark delete files from checkpointing? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Checkpointing doesn't appear to be working for direct streaming from Kafka
It looks like there's an issue with the 'Parameters' pojo I'm using within my driver program. For some reason that needs to be serializable, which is odd. java.io.NotSerializableException: com.kona.consumer.kafka.spark.Parameters Giving it another whirl though having to make it serializable seems odd to me.. On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote: If you've set the checkpoint dir, it seems like indeed the intent is to use a default checkpoint interval in DStream: private[streaming] def initialize(time: Time) { ... // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint checkpointDuration == null) { checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt logInfo(Checkpoint interval automatically set to + checkpointDuration) } Do you see that log message? what's the interval? that could at least explain why it's not doing anything, if it's quite long. It sort of seems wrong though since https://spark.apache.org/docs/latest/streaming-programming-guide.html suggests it was intended to be a multiple of the batch interval. The slide duration wouldn't always be relevant anyway. On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I've instrumented checkpointing per the programming guide and I can tell that Spark Streaming is creating the checkpoint directories but I'm not seeing any content being created in those directories nor am I seeing the effects I'd expect from checkpointing. I'd expect any data that comes into Kafka while the consumers are down, to get picked up when the consumers are restarted; I'm not seeing that. For now my checkpoint directory is set to the local file system with the directory URI being in this form: file:///mnt/dir1/dir2. I see a subdirectory named with a UUID being created under there but no files. I'm using a custom JavaStreamingContextFactory which creates a JavaStreamingContext with the directory set into it via the checkpoint(String) method. I'm currently not invoking the checkpoint(Duration) method on the DStream since I want to first rely on Spark's default checkpointing interval. My streaming batch duration millis is set to 1 second. Anyone have any idea what might be going wrong? Also, at which point does Spark delete files from checkpointing? Thanks.
Re: How to create Spark DataFrame using custom Hadoop InputFormat?
Hi Ted thanks much for the reply. I cant share code on public forum. I have created custom format by extending Hadoop mapred InputFormat class and same way RecordReader class. If you can help me how do I use the same in DataFrame it would be very helpful. On Sat, Aug 1, 2015 at 12:12 AM, Ted Yu yuzhih...@gmail.com wrote: Can you pastebin the complete stack trace ? If you can show skeleton of MyInputFormat and MyRecordWritable, that would provide additional information as well. Cheers On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I am having my own Hadoop custom InputFormat which I need to use in creating DataFrame. I tried to do the following JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class); myFormatAsDataframe.show(); Above code does not work and throws exception saying java.lang.IllegalArgumentException object is not an instance of declaring class My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do I make it work with Spark please guide I am new to Spark. Thank in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Checkpointing doesn't appear to be working for direct streaming from Kafka
I've instrumented checkpointing per the programming guide and I can tell that Spark Streaming is creating the checkpoint directories but I'm not seeing any content being created in those directories nor am I seeing the effects I'd expect from checkpointing. I'd expect any data that comes into Kafka while the consumers are down, to get picked up when the consumers are restarted; I'm not seeing that. For now my checkpoint directory is set to the local file system with the directory URI being in this form: file:///mnt/dir1/dir2. I see a subdirectory named with a UUID being created under there but no files. I'm using a custom JavaStreamingContextFactory which creates a JavaStreamingContext with the directory set into it via the checkpoint(String) method. I'm currently not invoking the checkpoint(Duration) method on the DStream since I want to first rely on Spark's default checkpointing interval. My streaming batch duration millis is set to 1 second. Anyone have any idea what might be going wrong? Also, at which point does Spark delete files from checkpointing? Thanks.
RE: How to register array class with Kyro in spark-defaults.conf
Here is the definition of EsDoc case class EsDoc(id: Long, isExample: Boolean, docSetIds: Array[String], randomId: Double, vector: String) extends Serializable Note that it is not EsDoc having problem with registration. It is the EsDoc[] (the array class of EsDoc) that has problem with registration. I have tried to replace the class EsDoc by the Map class, I also got the following error ask me to register the Map[] (array of Map) class java.lang.IllegalArgumentException: Class is not registered: scala.collection.immutable.Map[] Note: To register this class use: kryo.register(scala.collection.immutable.Map[].class); So the question is how to register Array class? Adding the following in spark-defauls.conf does not work spark.kryo.classesToRegister scala.collection.immutable.Map,scala.collection.immutable.Map[] Ningjun From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Friday, July 31, 2015 11:49 AM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to register array class with Kyro in spark-defaults.conf For the second exception, was there anything following SparkException which would give us more clue ? Can you tell us how EsDoc is structured ? Thanks On Fri, Jul 31, 2015 at 8:42 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: Does anybody have any idea how to solve this problem? Ningjun From: Wang, Ningjun (LNG-NPV) Sent: Thursday, July 30, 2015 11:06 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: How to register array class with Kyro in spark-defaults.conf I register my class with Kyro in spark-defaults.conf as follow spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrationRequired true spark.kryo.classesToRegister ltn.analytics.es.EsDoc But I got the following exception java.lang.IllegalArgumentException: Class is not registered: ltn.analytics.es.EsDoc[] Note: To register this class use: kryo.register(ltn.analytics.es.EsDoc[].class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:162) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) The error message seems to suggest that I should also register the array class EsDoc[]. So I add it to spark-defaults.conf as follow spark.kryo.classesToRegister ltn.analytics.es.EsDoc,ltn.analytics.es.EsDoc[] Then I got the following error org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101) at org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:153) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:115) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) at ltn.analytics.index.Index.addDocuments(Index.scala:82) Please advise. Thanks. Ningjun
Re: Checkpointing doesn't appear to be working for direct streaming from Kafka
I'll check the log info message.. Meanwhile, the code is basically public class KafkaSparkStreamingDriver implements Serializable { .. SparkConf sparkConf = createSparkConf(appName, kahunaEnv); JavaStreamingContext jssc = params.isCheckpointed() ? createCheckpointedContext(sparkConf, params) : createContext(sparkConf, params); jssc.start(); jssc.awaitTermination(); jssc.close(); .. private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf, Parameters params) { JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return createContext(sparkConf, params); } }; return JavaStreamingContext.getOrCreate(params.getCheckpointDir(), factory); } ... private JavaStreamingContext createContext(SparkConf sparkConf, Parameters params) { // Create context with the specified batch interval, in milliseconds. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(params.getBatchDurationMillis())); // Set the checkpoint directory, if we're checkpointing if (params.isCheckpointed()) { jssc.checkpoint(params.getCheckpointDir()); } SetString topicsSet = new HashSetString(Arrays.asList(params .getTopic())); // Set the Kafka parameters. MapString, String kafkaParams = new HashMapString, String(); kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params .getBrokerList()); if (StringUtils.isNotBlank(params.getAutoOffsetReset())) { kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params .getAutoOffsetReset()); } // Create direct Kafka stream with the brokers and the topic. JavaPairInputDStreamString, String messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); // See if there's an override of the default checkpoint duration. if (params.isCheckpointed() params.getCheckpointMillis() 0L) { messages.checkpoint(Durations.milliseconds(params .getCheckpointMillis())); } . On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen so...@cloudera.com wrote: If you've set the checkpoint dir, it seems like indeed the intent is to use a default checkpoint interval in DStream: private[streaming] def initialize(time: Time) { ... // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger if (mustCheckpoint checkpointDuration == null) { checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt logInfo(Checkpoint interval automatically set to + checkpointDuration) } Do you see that log message? what's the interval? that could at least explain why it's not doing anything, if it's quite long. It sort of seems wrong though since https://spark.apache.org/docs/latest/streaming-programming-guide.html suggests it was intended to be a multiple of the batch interval. The slide duration wouldn't always be relevant anyway. On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I've instrumented checkpointing per the programming guide and I can tell that Spark Streaming is creating the checkpoint directories but I'm not seeing any content being created in those directories nor am I seeing the effects I'd expect from checkpointing. I'd expect any data that comes into Kafka while the consumers are down, to get picked up when the consumers are restarted; I'm not seeing that. For now my checkpoint directory is set to the local file system with the directory URI being in this form: file:///mnt/dir1/dir2. I see a subdirectory named with a UUID being created under there but no files. I'm using a custom JavaStreamingContextFactory which creates a JavaStreamingContext with the directory set into it via the checkpoint(String) method. I'm currently not invoking the checkpoint(Duration) method on the DStream since I want to first rely on Spark's default checkpointing interval. My streaming batch duration millis is set to 1 second. Anyone have any idea what might be going wrong? Also, at which point does Spark delete files from checkpointing? Thanks.
How to create Spark DataFrame using custom Hadoop InputFormat?
Hi I am having my own Hadoop custom InputFormat which I need to use in creating DataFrame. I tried to do the following JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class); myFormatAsDataframe.show(); Above code does not work and throws exception saying java.lang.IllegalArgumentException object is not an instance of declaring class My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do I make it work with Spark please guide I am new to Spark. Thank in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to add multiple sequence files from HDFS to a Spark Context to do Batch processing?
Hi, How to add multiple sequence files from HDFS to a Spark Context to do Batch processing? I have something like the following in my code. Do I have to add Comma separated list of Sequence file paths to the Spark Context. val data = if(args.length0 args(0)!=null) sc.sequenceFile(file, classOf[LongWritable], classOf[Text]). map{case (x, y) = (x.toString, y.toString)} Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-multiple-sequence-files-from-HDFS-to-a-Spark-Context-to-do-Batch-processing-tp24102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Checkpointing doesn't appear to be working for direct streaming from Kafka
Show us the relevant code On Fri, Jul 31, 2015 at 12:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: I've instrumented checkpointing per the programming guide and I can tell that Spark Streaming is creating the checkpoint directories but I'm not seeing any content being created in those directories nor am I seeing the effects I'd expect from checkpointing. I'd expect any data that comes into Kafka while the consumers are down, to get picked up when the consumers are restarted; I'm not seeing that. For now my checkpoint directory is set to the local file system with the directory URI being in this form: file:///mnt/dir1/dir2. I see a subdirectory named with a UUID being created under there but no files. I'm using a custom JavaStreamingContextFactory which creates a JavaStreamingContext with the directory set into it via the checkpoint(String) method. I'm currently not invoking the checkpoint(Duration) method on the DStream since I want to first rely on Spark's default checkpointing interval. My streaming batch duration millis is set to 1 second. Anyone have any idea what might be going wrong? Also, at which point does Spark delete files from checkpointing? Thanks.
Re: Spark SQL DataFrame: Nullable column and filtering
Dear Michael, dear all, a minimal example is listed below. After some further analysis I could figure out, that the problem is related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of the left and right dataframes when doing the select on the joined table. /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)).map(cn = rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) .select(leftColumns ++ rightColumns: _*) } As the column y of the right table has nullable=false, this is then also transferred to the joined-Table y-Column, as I use rightDF( y ). Thus, I need to use columns of the joined table for the select. *Question now: The joined table has column names x, a, x, y. How do I discard the second x column?* All my approaches failed (assuming here, that joinedDF is the joined DataFrame. - Using joinedDFdrop( x ) discards both x columns. - Using joinedDF(x) does not work as it is ambigious - Also using rightDF.as( aliasname) in order to differentiate the column x (from left DataFrame) with x (from right DataFrame) did not work out, as I found no way as use select( $aliasname.x) really programmatically. Could someone sketch the code? Any help welcome, thanks Martin import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.{DataFrame, SQLContext} object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } object MinimalShowcase { /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)).map(cn = rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) .select(leftColumns ++ rightColumns: _*) } /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } def main (args: Array[String]) { val conf = new SparkConf() .setAppName(Minimal) .setMaster(local[*]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF = sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val mappingWithNullDF = setNullableStateOfColumn(mappingDF, y, true) val joinedDF = recordDF.join(mappingDF, recordDF(x) === mappingDF(x), leftouter) println(joinedDF:) joinedDF.show joinedDF.printSchema joinedDF.filter(joinedDF(y).isNotNull).show //joinedDF: //+-+-+++ //|x|a| x| y| //+-+-+++ //|1|hello|null|null| //|2| bob| 2| 5| //+-+-+++ // //root //|-- x: integer (nullable = false) //|-- a: string (nullable = true) //|-- x: integer (nullable = true) //|-- y: integer (nullable = true) // //+-+---+-+-+ //|x| a|x|y| //+-+---+-+-+ //|2|bob|2|5| //+-+---+-+-+ val extrajoinedDF = leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, x) println(extrajoinedDF:) extrajoinedDF.show extrajoinedDF.printSchema extrajoinedDF.filter(extrajoinedDF(y).isNotNull).show //extrajoinedDF: //+-+-++ //|x|a| y| //+-+-++ //|1|hello|null| //|2| bob| 5| //+-+-++ // //root //|-- x: integer (nullable = false) //|-- a: string (nullable = true) //|-- y: integer (nullable = false) // //+-+-++ //|x|a| y| //+-+-++ //|1|hello|null| //|2| bob| 5| //+-+-++ val joined2DF = recordDF.join(mappingWithNullDF, recordDF(x) === mappingWithNullDF(x), leftouter) println(joined2DF:)
How to increase parallelism of a Spark cluster?
Hello, I am trying to run a Spark job that hits an external webservice to get back some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, and is accessed using code similar to that shown below. def getResults(keyValues: Iterator[(String, Array[String])]): Iterator[(String, String)] = { val solr = new HttpSolrClient() initializeSolrParameters(solr) keyValues.map(keyValue = (keyValue._1, process(solr, keyValue))) } myRDD.repartition(10) .mapPartitions(keyValues = getResults(keyValues)) The mapPartitions does some initialization to the SolrJ client per partition and then hits it for each record in the partition via the getResults() call. I repartitioned in the hope that this will result in 10 clients hitting Solr simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I can). However, I counted the number of open connections using netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr has a constant 4 clients (ie, equal to the number of workers) over the lifetime of the run. My observation leads me to believe that each worker processes a single stream of work sequentially. However, from what I understand about how Spark works, each worker should be able to process number of tasks parallelly, and that repartition() is a hint for it to do so. Is there some SparkConf environment variable I should set to increase parallelism in these workers, or should I just configure a cluster with multiple workers per machine? Or is there something I am doing wrong? Thank you in advance for any pointers you can provide. -sujit
Re: How to create Spark DataFrame using custom Hadoop InputFormat?
I don't think using Void class is the right choice - it is not even a Writable. BTW in the future, capture text output instead of image. Thanks On Fri, Jul 31, 2015 at 12:35 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Ted thanks My key is always Void because my custom format file is non splittable so key is Void and values is MyRecordWritable which extends Hadoop Writable. I am sharing my log as snap please dont mind as I cant paste code outside. Regards, Umesh On Sat, Aug 1, 2015 at 12:59 AM, Ted Yu yuzhih...@gmail.com wrote: Looking closer at the code you posted, the error likely was caused by the 3rd parameter: Void.class It is supposed to be the class of key. FYI On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I am having my own Hadoop custom InputFormat which I need to use in creating DataFrame. I tried to do the following JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class); myFormatAsDataframe.show(); Above code does not work and throws exception saying java.lang.IllegalArgumentException object is not an instance of declaring class My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do I make it work with Spark please guide I am new to Spark. Thank in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What happens when you create more DStreams then nodes in the cluster?
Since one input dstream creates one receiver and one receiver uses one executor / node. What happens if you create more Dstreams than nodes in the cluster? Say I have 30 Dstreams on a 15 node cluster. Will ~10 streams get assigned to ~10 executors / nodes then the other ~20 streams will be queued for resources or will the other streams just fail and never run?
Re: How to create Spark DataFrame using custom Hadoop InputFormat?
Looking closer at the code you posted, the error likely was caused by the 3rd parameter: Void.class It is supposed to be the class of key. FYI On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote: Hi I am having my own Hadoop custom InputFormat which I need to use in creating DataFrame. I tried to do the following JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd = jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class); JavaRDDMyRecordWritable myformatRdd = myFormatAsPairRdd.values(); DataFrame myFormatAsDataframe = sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class); myFormatAsDataframe.show(); Above code does not work and throws exception saying java.lang.IllegalArgumentException object is not an instance of declaring class My custom Hadoop InputFormat works very well with Hive,MapReduce etc How do I make it work with Spark please guide I am new to Spark. Thank in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
Tathagata, Could the bottleneck possibility be the number of executor nodes in our cluster? Since we are creating 500 Dstreams based off 500 textfile directories, do we need at least 500 executors / nodes to be receivers for each one of the streams? On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com wrote: @Ashwin: You could append the topic in the data. val kafkaStreams = topics.map { topic = KafkaUtils.createDirectStream(topic...).map { x = (x, topic) } } val unionedStream = context.union(kafkaStreams) @Brandon: I dont recommend it, but you could do something crazy like use the foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD waits for all the jobs to complete. manyDStreams.foreach { dstream = dstream1.foreachRDD { rdd = // Add runnable that runs the job on RDD to threadpool // This does not wait for the job to finish } } anyOfTheManyDStreams.foreachRDD { _ = // wait for all the current batch's jobs in the threadpool to complete. } This would run all the Spark jobs in the batch in parallel in thread pool, but it would also make sure all the jobs finish before the batch is marked as completed. On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com wrote: Thank you Tathagata. My main use case for the 500 streams is to append new elements into their corresponding Spark SQL tables. Every stream is mapped to a table so I'd like to use the streams to appended the new rdds to the table. If I union all the streams, appending new elements becomes a nightmare. So there is no other way to parallelize something like the following? Will this still run sequence or timeout? //500 streams streams.foreach { stream = stream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.saveAsTable(streamTuple._1, SaveMode.Append) } } On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com wrote: I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design?
how to convert a sequence of TimeStamp to a dataframe
Hi Guys, I have struggled for a while on this seeming simple thing: I have a sequence of timestamps and want to create a dataframe with 1 column. Seq[java.sql.Timestamp] //import collection.breakOut var seqTimestamp = scala.collection.Seq(listTs:_*) seqTimestamp: Seq[java.sql.Timestamp] = List(2015-07-22 16:52:00.0, 2015-07-22 16:53:00.0, ., ) I tried a lot of ways to create a dataframe and below is another failed way: import sqlContext.implicits._ var rddTs = sc.parallelize(seqTimestamp) rddTs.toDF(minInterval) console:108: error: value toDF is not a member of org.apache.spark.rdd.RDD[java.sql.Timestamp] rddTs.toDF(minInterval) So, any guru could please tell me how to do this I am not familiar with Scala or Spark. I wonder if learning Scala will help this at all? It just sounds a lot of time of trial/error and googling. docs like https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/DataFrame.html https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/SQLContext.html#createDataFrame(scala.collection.Seq, scala.reflect.api.TypeTags.TypeTag) does not help. Btw, I am using Spark 1.4. Thanks in advance, J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setting a stage timeout
The referenced bug has been fixed in 1.4.0, are you able to upgrade ? Cheers On Fri, Jul 31, 2015 at 10:01 AM, William Kinney william.kin...@gmail.com wrote: Hi, I had a job that got stuck on yarn due to https://issues.apache.org/jira/browse/SPARK-6954 It never exited properly. Is there a way to set a timeout for a stage or all stages?
Re: SparkLauncher not notified about finished job - hangs infinitely.
Tomasz: Please take a look at the Redirector class inside: ./launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java FYI On Fri, Jul 31, 2015 at 10:02 AM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi Tomasz, *Answer to your 1st question*: Clear/read the error (spark.getErrorStream()) and output (spark.getInputStream()) stream buffers before you call spark.waitFor(), it would be better to clear/read them with 2 different threads. Then it should work fine. As Spark job is launched as subprocess, and according to Oracle documentation https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html: By default, the created subprocess does not have its own terminal or console. All its standard I/O (i.e. stdin, stdout, stderr) operations will be redirected to the parent process, where they can be accessed via the streams obtained using the methodsgetOutputStream(), getInputStream(), and getErrorStream(). The parent process uses these streams to feed input to and get output from the subprocess. Because some native platforms only provide limited buffer size for standard input and output streams, failure to promptly write the input stream or read the output stream of the subprocess may cause the subprocess to block, or even deadlock. On Fri, Jul 31, 2015 at 2:45 AM, Tomasz Guziałek tomasz.guzia...@humaninference.com wrote: I am trying to submit a JAR with Spark job into the YARN cluster from Java code. I am using SparkLauncher to submit SparkPi example: Process spark = new SparkLauncher() .setAppResource(C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar) .setMainClass(org.apache.spark.examples.SparkPi) .setMaster(yarn-cluster) .launch(); System.out.println(Waiting for finish...); int exitCode = spark.waitFor(); System.out.println(Finished! Exit code: + exitCode); There are two problems: 1. While submitting in yarn-cluster mode, the application is successfully submitted to YARN and executes successfully (it is visible in the YARN UI, reported as SUCCESS and PI value is printed in the output). However, the submitting application is never notified that processing is finished - it hangs infinitely after printing Waiting to finish... The log of the container can be found here: http://pastebin.com/LscBjHQc 2. While submitting in yarn-client mode, the application does not appear in YARN UI and the submitting application hangs at Waiting to finish... When hanging code is killed, the application shows up in YARN UI and it is reported as SUCCESS, but the output is empty (PI value is not printed out). The log of the container can be found here: http://pastebin.com/9KHi81r4 I tried to execute the submitting application both with Oracle Java 8 and 7. Any hints what might be wrong? Best regards, Tomasz -- Best regards, Elkhan Dadashov
Record Linkage in Spark
Hi Folks I would like to do RL in spark, as legacy approach using blocking query is not working as data is getting huge. Please provide necessary links and information for doing so. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Record-Linkage-in-Spark-tp24100.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What happens when you create more DStreams then nodes in the cluster?
@Brandon Each node can host multiple executors. For example, In a 15 node cluster, if your NodeManager ( In YARN) or equivalent ( MESOS or Standalone), runs on each of this node and if the node has enough resources to host say 5 executors, then in total you can have 15*5 executors and each of this executor can have a DStream Receiver. But be aware that each of the DStream receiver uses a dedicated core. the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it Thanks, Ashwin On Fri, Jul 31, 2015 at 4:52 PM, Brandon White bwwintheho...@gmail.com wrote: Since one input dstream creates one receiver and one receiver uses one executor / node. What happens if you create more Dstreams than nodes in the cluster? Say I have 30 Dstreams on a 15 node cluster. Will ~10 streams get assigned to ~10 executors / nodes then the other ~20 streams will be queued for resources or will the other streams just fail and never run? -- Thanks Regards, Ashwin Giridharan
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I can get the topics from the offset!! On Fri, Jul 31, 2015 at 4:41 PM, Brandon White bwwintheho...@gmail.com wrote: Tathagata, Could the bottleneck possibility be the number of executor nodes in our cluster? Since we are creating 500 Dstreams based off 500 textfile directories, do we need at least 500 executors / nodes to be receivers for each one of the streams? On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com wrote: @Ashwin: You could append the topic in the data. val kafkaStreams = topics.map { topic = KafkaUtils.createDirectStream(topic...).map { x = (x, topic) } } val unionedStream = context.union(kafkaStreams) @Brandon: I dont recommend it, but you could do something crazy like use the foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD waits for all the jobs to complete. manyDStreams.foreach { dstream = dstream1.foreachRDD { rdd = // Add runnable that runs the job on RDD to threadpool // This does not wait for the job to finish } } anyOfTheManyDStreams.foreachRDD { _ = // wait for all the current batch's jobs in the threadpool to complete. } This would run all the Spark jobs in the batch in parallel in thread pool, but it would also make sure all the jobs finish before the batch is marked as completed. On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com wrote: Thank you Tathagata. My main use case for the 500 streams is to append new elements into their corresponding Spark SQL tables. Every stream is mapped to a table so I'd like to use the streams to appended the new rdds to the table. If I union all the streams, appending new elements becomes a nightmare. So there is no other way to parallelize something like the following? Will this still run sequence or timeout? //500 streams streams.foreach { stream = stream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.saveAsTable(streamTuple._1, SaveMode.Append) } } On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com wrote: I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design? -- Thanks Regards, Ashwin Giridharan
Re: Problems with JobScheduler
It doesn't make sense to me. Because in the another cluster process all data in less than a second. Anyway, I'm going to set that parameter. 2015-07-31 0:36 GMT+02:00 Tathagata Das t...@databricks.com: Yes, and that is indeed the problem. It is trying to process all the data in Kafka, and therefore taking 60 seconds. You need to set the rate limits for that. On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger c...@koeninger.org wrote: If you don't set it, there is no maximum rate, it will get everything from the end of the last batch to the maximum available offset On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz konstt2...@gmail.com wrote: The difference is that one recives more data than the others two. I can pass thought parameters the topics, so, I could execute the code trying with one topic and figure out with one is the topic, although I guess that it's the topics which gets more data. Anyway it's pretty weird those delays in just one of the cluster even if the another one is not running. I have seen the parameter spark.streaming.kafka.maxRatePerPartition, I haven't set any value for this parameter, how does it work if this parameter doesn't have a value? 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org: If the jobs are running on different topicpartitions, what's different about them? Is one of them 120x the throughput of the other, for instance? You should be able to eliminate cluster config as a difference by running the same topic partition on the different clusters and comparing the results. On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have three topics with one partition each topic. So each jobs run about one topics. 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org: Just so I'm clear, the difference in timing you're talking about is this: 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s Are those jobs running on the same topicpartition? On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I read about maxRatePerPartition parameter, I haven't set this parameter. Could it be the problem?? Although this wouldn't explain why it doesn't work in one of the clusters. 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30 14:33:50 INFO JobScheduler: Added jobs for time 143825963 ms* *15/07/30 14:33:55 INFO JobScheduler: Added jobs for time 1438259635000 ms* 15/07/30 14:33:59 INFO TaskSetManager: Finished task 1.0 in stage 24.0 (TID 73) in 60373 ms on (3/3) 15/07/30 14:33:59 INFO YarnScheduler: Removed TaskSet 24.0, whose tasks have all completed,
Spark-Submit error
HI, I have submitted a Spark Job with options jars,class,master as *local* but i am getting an error as below *dse spark-submit spark error exception in thread main java.io.ioexception: Invalid Request Exception(Why you have not logged in)* *Note: submitting datastax spark node* please let me know if anybody have a solutions for this issue Regards, Saish Chandra
Re: Has anybody ever tried running Spark Streaming on 500 text streams?
@Brandon, the file streams do not use receivers, so the bottleneck is not about executors per se. But there could be couple of bottlenecks 1. Every batch interval, the 500 dstreams are going to get directory listing from 500 directories, SEQUENTIALLY. So preparing the batch's RDDs and jobs can take a time. So your batch interval cant be small, may have to be 10s of seconds. Which is probably fine for your application, otherwise you would not be using files in the first place. 2. Processing new files from 500 directories may take significant computation power. Just make sure you get large enough cluster. On Fri, Jul 31, 2015 at 2:40 PM, Ashwin Giridharan ashwin.fo...@gmail.com wrote: Thanks a lot @Das @Cody. I moved from receiver based to direct stream and I can get the topics from the offset!! On Fri, Jul 31, 2015 at 4:41 PM, Brandon White bwwintheho...@gmail.com wrote: Tathagata, Could the bottleneck possibility be the number of executor nodes in our cluster? Since we are creating 500 Dstreams based off 500 textfile directories, do we need at least 500 executors / nodes to be receivers for each one of the streams? On Tue, Jul 28, 2015 at 6:09 PM, Tathagata Das t...@databricks.com wrote: @Ashwin: You could append the topic in the data. val kafkaStreams = topics.map { topic = KafkaUtils.createDirectStream(topic...).map { x = (x, topic) } } val unionedStream = context.union(kafkaStreams) @Brandon: I dont recommend it, but you could do something crazy like use the foreachRDD to farm out the jobs to a threadpool, but the final foreachRDD waits for all the jobs to complete. manyDStreams.foreach { dstream = dstream1.foreachRDD { rdd = // Add runnable that runs the job on RDD to threadpool // This does not wait for the job to finish } } anyOfTheManyDStreams.foreachRDD { _ = // wait for all the current batch's jobs in the threadpool to complete. } This would run all the Spark jobs in the batch in parallel in thread pool, but it would also make sure all the jobs finish before the batch is marked as completed. On Tue, Jul 28, 2015 at 4:05 PM, Brandon White bwwintheho...@gmail.com wrote: Thank you Tathagata. My main use case for the 500 streams is to append new elements into their corresponding Spark SQL tables. Every stream is mapped to a table so I'd like to use the streams to appended the new rdds to the table. If I union all the streams, appending new elements becomes a nightmare. So there is no other way to parallelize something like the following? Will this still run sequence or timeout? //500 streams streams.foreach { stream = stream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.saveAsTable(streamTuple._1, SaveMode.Append) } } On Tue, Jul 28, 2015 at 3:42 PM, Tathagata Das t...@databricks.com wrote: I dont think any one has really run 500 text streams. And parSequences do nothing out there, you are only parallelizing the setup code which does not really compute anything. Also it setsup 500 foreachRDD operations that will get executed in each batch sequentially, so does not make sense. The write way to parallelize this is union all the streams. val streams = streamPaths.map { path = ssc.textFileStream(path) } val unionedStream = streamingContext.union(streams) unionedStream.foreachRDD { rdd = // do something } Then there is only one foreachRDD executed in every batch that will process in parallel all the new files in each batch interval. TD On Tue, Jul 28, 2015 at 3:06 PM, Brandon White bwwintheho...@gmail.com wrote: val ssc = new StreamingContext(sc, Minutes(10)) //500 textFile streams watching S3 directories val streams = streamPaths.par.map { path = ssc.textFileStream(path) } streams.par.foreach { stream = stream.foreachRDD { rdd = //do something } } ssc.start() Would something like this scale? What would be the limiting factor to performance? What is the best way to parallelize this? Any other ideas on design? -- Thanks Regards, Ashwin Giridharan
Re: how to convert a sequence of TimeStamp to a dataframe
Please take a look at stringToTimestamp() in ./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala Representing timestamp with long should work. Cheers On Fri, Jul 31, 2015 at 2:50 PM, Joanne Contact joannenetw...@gmail.com wrote: Hi Guys, I have struggled for a while on this seeming simple thing: I have a sequence of timestamps and want to create a dataframe with 1 column. Seq[java.sql.Timestamp] //import collection.breakOut var seqTimestamp = scala.collection.Seq(listTs:_*) seqTimestamp: Seq[java.sql.Timestamp] = List(2015-07-22 16:52:00.0, 2015-07-22 16:53:00.0, ., ) I tried a lot of ways to create a dataframe and below is another failed way: import sqlContext.implicits._ var rddTs = sc.parallelize(seqTimestamp) rddTs.toDF(minInterval) console:108: error: value toDF is not a member of org.apache.spark.rdd.RDD[java.sql.Timestamp] rddTs.toDF(minInterval) So, any guru could please tell me how to do this I am not familiar with Scala or Spark. I wonder if learning Scala will help this at all? It just sounds a lot of time of trial/error and googling. docs like https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/DataFrame.html https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/SQLContext.html#createDataFrame(scala.collection.Seq , scala.reflect.api.TypeTags.TypeTag) does not help. Btw, I am using Spark 1.4. Thanks in advance, J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Encryption on RDDs or in-memory/cache on Apache Spark
Hi, I am currently working on the latest version of Apache Spark (1.4.1), pre-built package for Hadoop 2.6+. Is there any feature in Spark/Hadoop to encrypt RDDs or in-memory/cache (something similar is Altibase's HDB: http://altibase.com/in-memory-database-computing-solutions/security/) when running applications in Spark? Or is there an external library/framework which could be used to encrypt RDDs or in-memory/cache in Spark? I discovered it is possible to encrypt the data, and encapsulate it into RDD. However, I feel this affects Spark's fast data processing as it is slower to encrypt the data, and then encapsulate it to RDD; it's then a two step process. Encryption and storing data should be done parallel. Any help would be appreciated. Many thanks, Matthew - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problems with JobScheduler
I detected the error. The final step is to index data in ElasticSearch, The elasticSearch in one of the cluster is overhelmed and it doesn't work correctly. I linked the cluster which doesn't work with another ES and don't get any delay. Sorry, it wasn't relationed with Spark! 2015-07-31 9:15 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: It doesn't make sense to me. Because in the another cluster process all data in less than a second. Anyway, I'm going to set that parameter. 2015-07-31 0:36 GMT+02:00 Tathagata Das t...@databricks.com: Yes, and that is indeed the problem. It is trying to process all the data in Kafka, and therefore taking 60 seconds. You need to set the rate limits for that. On Thu, Jul 30, 2015 at 8:51 AM, Cody Koeninger c...@koeninger.org wrote: If you don't set it, there is no maximum rate, it will get everything from the end of the last batch to the maximum available offset On Thu, Jul 30, 2015 at 10:46 AM, Guillermo Ortiz konstt2...@gmail.com wrote: The difference is that one recives more data than the others two. I can pass thought parameters the topics, so, I could execute the code trying with one topic and figure out with one is the topic, although I guess that it's the topics which gets more data. Anyway it's pretty weird those delays in just one of the cluster even if the another one is not running. I have seen the parameter spark.streaming.kafka.maxRatePerPartition, I haven't set any value for this parameter, how does it work if this parameter doesn't have a value? 2015-07-30 16:32 GMT+02:00 Cody Koeninger c...@koeninger.org: If the jobs are running on different topicpartitions, what's different about them? Is one of them 120x the throughput of the other, for instance? You should be able to eliminate cluster config as a difference by running the same topic partition on the different clusters and comparing the results. On Thu, Jul 30, 2015 at 9:29 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I have three topics with one partition each topic. So each jobs run about one topics. 2015-07-30 16:20 GMT+02:00 Cody Koeninger c...@koeninger.org: Just so I'm clear, the difference in timing you're talking about is this: 15/07/30 14:33:59 INFO DAGScheduler: Job 24 finished: foreachRDD at MetricsSpark.scala:67, took 60.391761 s 15/07/30 14:37:35 INFO DAGScheduler: Job 93 finished: foreachRDD at MetricsSpark.scala:67, took 0.531323 s Are those jobs running on the same topicpartition? On Thu, Jul 30, 2015 at 8:03 AM, Guillermo Ortiz konstt2...@gmail.com wrote: I read about maxRatePerPartition parameter, I haven't set this parameter. Could it be the problem?? Although this wouldn't explain why it doesn't work in one of the clusters. 2015-07-30 14:47 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: They just share the kafka, the rest of resources are independents. I tried to stop one cluster and execute just the cluster isn't working but it happens the same. 2015-07-30 14:41 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com: I have some problem with the JobScheduler. I have executed same code in two cluster. I read from three topics in Kafka with DirectStream so I have three tasks. I have check YARN and there aren't more jobs launched. The cluster where I have troubles I got this logs: 15/07/30 14:32:58 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 72, x, RACK_LOCAL, 14856 bytes) 15/07/30 14:32:58 INFO TaskSetManager: Starting task 1.0 in stage 24.0 (TID 73, xxx, RACK_LOCAL, 14852 bytes) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on xxx:44909 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:58 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on x:43477 (size: 1802.0 B, free: 530.3 MB) 15/07/30 14:32:59 INFO TaskSetManager: Starting task 2.0 in stage 24.0 (TID 74, x, RACK_LOCAL, 14860 bytes) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 72) in 208 ms on x (1/3) 15/07/30 14:32:59 INFO TaskSetManager: Finished task 2.0 in stage 24.0 (TID 74) in 49 ms on x (2/3) *15/07/30 14:33:00 INFO JobScheduler: Added jobs for time 143825958 ms* *15/07/30 14:33:05 INFO JobScheduler: Added jobs for time 1438259585000 ms* *15/07/30 14:33:10 INFO JobScheduler: Added jobs for time 143825959 ms* *15/07/30 14:33:15 INFO JobScheduler: Added jobs for time 1438259595000 ms* *15/07/30 14:33:20 INFO JobScheduler: Added jobs for time 143825960 ms* *15/07/30 14:33:25 INFO JobScheduler: Added jobs for time 1438259605000 ms* *15/07/30 14:33:30 INFO JobScheduler: Added jobs for time 143825961 ms* *15/07/30 14:33:35 INFO JobScheduler: Added jobs for time 1438259615000 ms* *15/07/30 14:33:40 INFO JobScheduler: Added jobs for time 143825962 ms* *15/07/30 14:33:45 INFO JobScheduler: Added jobs for time 1438259625000 ms* *15/07/30
Buffer Overflow exception
Hi, I am getting buffer over flow exception while using spark via thrifserver base.May I know how to overcome this? Code: HqlConnection con = new HqlConnection(localhost, 10001, HiveServer.HiveServer2); con.Open(); HqlCommand createCommand = new HqlCommand(tablequery, con); = Here table query was the query which I used to create a table createCommand.ExecuteNonQuery(); #.It seems spark works slower when compare to SQLServer.May I know the reason for that? My Case is: I have used the table called TestTable with 4 records in SQLServer and I executed a query and it returns the result in 1 sec. Then I have converted the same table as csv and exported it to spark and I executed the same query like in code but it takes more time almost 2 minute to return the results. May I know the reason for this slow process too? Thanks, Vinod
looking for helps in using graphx aggregateMessages
Dear list, Hi~I am new to spark and graphx, and I have a few experiences using scala. I want to use graphx to calculate some basic statistics in linked open data, which is basically a graph. Suppose the graph only contains one type of edge, directing from individuals to concepts, and the edge labels are all type. I want to find all pairs of concepts that have at least one individual linking to both of them.The following is my current solution, but sadly doesn't work. Could you please help me work this out? Or are there better solutions? Any helps are appreciated! val conf = new SparkConf().setMaster(spark://MacBook-Pro:7077).setAppName(My App).setJars(...) val sc = new SparkContext(conf)// initialize individuals (in small letters) and concepts (in upper case letters) val users: RDD[(org.apache.spark.graphx.VertexId, String)] = sc.parallelize(Array((1L, a), (2L, b), (3L, e), (11L, A), (12L, B), (13L, C), (14L, D))) // initialize type edges val relationships: RDD[Edge[String]] = sc.parallelize(Array( Edge(1L, 11L, type), Edge(1L, 14L, type), Edge(1L, 13L, type), Edge(2L, 11L, type), Edge(2L, 12L, type), Edge(3L, 11L, type), Edge(3L, 13L, type))) val graph = Graph(users, relationships) val indTypes = graph.collectNeighborIds(EdgeDirection.Out) // seems to be stupid functions... def mapUDF(triplet: EdgeContext[String, String, HashMap[Long, Int]]) = { val msg = indTypes.filter(pred = pred._1 == triplet.srcId).first()._2.aggregate(new HashMap[Long, Int])((a, b) = a.+=(b - 1), (a, b) = a ++ b) triplet.sendToDst(msg) } def reduceUDF(a: HashMap[Long, Int], b: HashMap[Long, Int]): HashMap[Long, Int] = a ++ b.map { case (k, v) = k - (v + a.getOrElse(k, 0)) } var pairs = new HashMap[(Long, Long), Int] val results = graph.aggregateMessages[HashMap[Long, Int]]( mapUDF, reduceUDF) results.foreach(result = { result._2.filter(p = p._1 != result._1).foreach(map = { val a = result._1 val b = map._1 if (!pairs.contains(a, b) !pairs.contains(b, a)) pairs += (a, b) - map._2 }) }) pairs.foreach(println(_)) The exceptions: TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, 10.42.0.17): java.lang.NullPointerExceptionat atur.node.learner.Test$.mapUDF(SimpleDisjointnessLearner.scala:147)at atur.node.learner.Test$$anonfun$4.apply(SimpleDisjointnessLearner.scala:155) best wishes,June