Re: Error when Spark streaming consumes from Kafka
Hi, This seems not fixed yet. I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505 Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to send JavaDStream RDD using foreachRDD using Java
Hello Sachin, While Akhil's solution is correct, this is not sufficient for your usecase. RDD.foreach (that Akhil is using) will run on the workers, but you are creating the Producer object on the driver. This will not work, a producer create on the driver cannot be used from the worker/executor. The best way to do what you want to do is to use rdd.foreachPartition. Inside the function supplied to RDD.foreachPartition, create the producer, send the whole partition, and close the producer. Am an phone so I am not able to generate Java code. TD On Mon, Feb 2, 2015 at 11:38 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Here you go: JavaDStreamString textStream = ssc.textFileStream(/home/akhld/sigmoid/); textStream.foreachRDD(new FunctionJavaRDDString,Void() { @Override public Void call(JavaRDDString rdd) throws Exception { // TODO Auto-generated method stub rdd.foreach(new VoidFunctionString(){ @Override public void call(String stringData) throws Exception { // Use this data! System.out.println(W00t!! Data : + stringData); } }); return null; } }); Thanks Best Regards On Sun, Feb 1, 2015 at 9:08 PM, sachin Singh sachin.sha...@gmail.com wrote: Hi I want to send streaming data to kafka topic, I am having RDD data which I converted in JavaDStream ,now I want to send it to kafka topic, I don't want kafka sending code, just I need foreachRDD implementation, my code is look like as public void publishtoKafka(ITblStream t) { MyTopicProducer MTP = ProducerFactory.createProducer(hostname+:+port); JavaDStream? rdd = (JavaDStream?) t.getRDD(); rdd.foreachRDD(new FunctionString, String() { @Override public Void call(JavaRDDString rdd) throws Exception { KafkaUtils.sendDataAsString(MTP,topicName, String RDDData); return null; } }); log.debug(sent to kafka: --); } here myTopicproducer will create producer which is working fine KafkaUtils.sendDataAsString is method which will publish data to kafka topic is also working fine, I have only one problem I am not able to convert JavaDStream rdd as string using foreach or foreachRDD finally I need String message from rdds, kindly suggest java code only and I dont want to use anonymous classes, Please send me only the part to send JavaDStream RDD using foreachRDD using Function Call Thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-send-JavaDStream-RDD-using-foreachRDD-using-Java-tp21456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can't find spark-parent when using snapshot build
Snapshot builds are not published. Unless you build and install snapshots locally (like with mvn install) they wont be found. On Feb 2, 2015 10:58 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to use the master version of spark. I build and install it with $ mvn clean clean install I manage to use it with the following configuration in my build.sbt : *libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.3.0-SNAPSHOT % provided, org.apache.spark %% spark-sql % 1.3.0-SNAPSHOT % provided , org.apache.spark %% spark-mllib % 1.3.0-SNAPSHOT % provided)* But After my last update I got the following error : *unresolved dependency: org.apache.spark#spark-mllib_2.10;1.3.0-SNAPSHOT: Maven2 Local: no ivy file nor artifact found for org.apache.spark#spark-parent;1.3.0-SNAPSHOT* Any ideas ? Cheers, Jao
Re: Error when Spark streaming consumes from Kafka
This is an issue that is hard to resolve without rearchitecting the whole Kafka Receiver. There are some workarounds worth looking into. http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote: Hi, This seems not fixed yet. I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505 Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when Spark streaming consumes from Kafka
Or you can use this Low Level Kafka Consumer for Spark : https://github.com/dibbhatt/kafka-spark-consumer This is now part of http://spark-packages.org/ and is running successfully for past few months in Pearson production environment . Being Low Level consumer, it does not have this re-balancing issue which High Level consumer have. Also I know there are few who has shifted to this Low Level Consumer which started giving them a better robust fault tolerant Kafka Receiver for Spark. Regards, Dibyendu On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This is an issue that is hard to resolve without rearchitecting the whole Kafka Receiver. There are some workarounds worth looking into. http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote: Hi, This seems not fixed yet. I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505 Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark impersonation
Hi Team, Does spark support impersonation? For example, when spark on yarn/hive/hbase/etc..., which user is used by default? The user which starts the spark job? Any suggestions related to impersonation? -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: Can't find spark-parent when using snapshot build
That's what I did. On Mon, Feb 2, 2015 at 11:28 PM, Sean Owen so...@cloudera.com wrote: Snapshot builds are not published. Unless you build and install snapshots locally (like with mvn install) they wont be found. On Feb 2, 2015 10:58 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to use the master version of spark. I build and install it with $ mvn clean clean install I manage to use it with the following configuration in my build.sbt : *libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.3.0-SNAPSHOT % provided, org.apache.spark %% spark-sql % 1.3.0-SNAPSHOT % provided , org.apache.spark %% spark-mllib % 1.3.0-SNAPSHOT % provided)* But After my last update I got the following error : *unresolved dependency: org.apache.spark#spark-mllib_2.10;1.3.0-SNAPSHOT: Maven2 Local: no ivy file nor artifact found for org.apache.spark#spark-parent;1.3.0-SNAPSHOT* Any ideas ? Cheers, Jao
how to specify hive connection options for HiveContext
Hi, I know two options, one for spark_submit, the other one for spark-shell, but how to set for programs running inside eclipse? Regards,
Java Kafka Word Count Issue
Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
How to define a file filter for file name patterns in Apache Spark Streaming in Java?
Hello, I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter for file names when creating an InputDStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html by invoking the fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method. My code is working perfectly fine when I don't use a file filter, e.g. by invoking the other fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method (described here https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29: https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 ). According to the documentation of *fileStream* method, I can pass it scala.Function1org.apache.hadoop.fs.Path,Object filter But so far, I could not create a fileFilter. My initial attempts have been 1- Tried to implement it as: Function1Path, Object fileFilter = new Function1Path, Object() { @Override public Object apply(Path v1) { return true; } @Override public A Function1A, Object compose(Function1A, Path g) { return Function1$class.compose(this, g); } @Override public A Function1Path, A andThen(Function1Object, A g) { return Function1$class.andThen(this, g); } }; But apparently my implementation of andThen is wrong, and I couldn't understand how I should implement it. It complains that the anonymous function: is not abstract and does not override abstract method AandThen$mcVJ$sp(scala.Function1scala.runtime.BoxedUnit,A) in scala.Function1 2- Tried to implement it as: Function1Path, Object fileFilter = new AbstractFunction1Path, Object() { @Override public Object apply(Path v1) { return true; } }; This one compiles, but when I run it I get an exception: 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1 java.io.NotSerializableException: myModule$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184) at
Re: Cheapest way to materialize an RDD?
You can also do something like rdd.sparkContext.runJob(rdd,(iter: Iterator[T]) = { while(iter.hasNext) iter.next() }) On Sat, Jan 31, 2015 at 5:24 AM, Sean Owen so...@cloudera.com wrote: Yeah, from an unscientific test, it looks like the time to cache the blocks still dominates. Saving the count is probably a win, but not big. Well, maybe good to know. On Fri, Jan 30, 2015 at 10:47 PM, Stephen Boesch java...@gmail.com wrote: Theoretically your approach would require less overhead - i.e. a collect on the driver is not required as the last step. But maybe the difference is small and that particular path may or may not have been properly optimized vs the count(). Do you have a biggish data set to compare the timings? 2015-01-30 14:42 GMT-08:00 Sean Owen so...@cloudera.com: So far, the canonical way to materialize an RDD just to make sure it's cached is to call count(). That's fine but incurs the overhead of actually counting the elements. However, rdd.foreachPartition(p = None) for example also seems to cause the RDD to be materialized, and is a no-op. Is that a better way to do it or am I not thinking of why it's insufficient? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: is there a master for spark cluster in ec2
There is a file $SPARK_HOME/conf/spark-env.sh which comes readily configured with the MASTER variable. So if you start pyspark or spark-shell from the ec2 login machine you will connect to the Spark master. On 29 Jan 2015, at 01:11, Mohit Singh mohit1...@gmail.com wrote: Hi, Probably a naive question.. But I am creating a spark cluster on ec2 using the ec2 scripts in there.. But is there a master param I need to set.. ./bin/pyspark --master [ ] ?? I don't yet fully understand the ec2 concepts so just wanted to confirm this?? Thanks -- Mohit When you want success as badly as you want the air, then you will get it. There is no other secret of success. -Socrates - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Graphx Spark] Error of Lost executor and TimeoutException
That may be the cause of your issue. Take a look at the tuning guide[1] and maybe also profile your application. See if you can reuse your objects. 1. http://spark.apache.org/docs/latest/tuning.html Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Sat, Jan 31, 2015 at 4:21 AM, Yifan LI iamyifa...@gmail.com wrote: Yes, I think so, esp. for a pregel application… have any suggestion? Best, Yifan LI On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com wrote: Is your code hitting frequent garbage collection? Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com wrote: Hi, I am running my graphx application on Spark 1.2.0(11 nodes cluster), has requested 30GB memory per node and 100 cores for around 1GB input dataset(5 million vertices graph). But the error below always happen… Is there anyone could give me some points? (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph computation, and another version of my application can work well on the same dataset while it need much less memory during computation) Thanks in advance!!! 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.ContextCleaner.org http://org.apache.spark.contextcleaner.org/ $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) [Stage 91:=== (2 + 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 [Stage 93: (29 + 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on small11-tap1.common.lip6.fr: remote Akka client disassociated [Stage 83: (1 + 0) / 6][Stage 86: (0 + 1) / 2][Stage 88: (0 + 2) / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 [Stage 83:=== (5 + 1) / 6][Stage 88:= (9 + 2) / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on small10-tap1.common.lip6.fr: remote Akka client disassociated 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 Best, Yifan LI
Re: Java Kafka Word Count Issue
Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -Sean Owen so...@cloudera.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: Sean Owen so...@cloudera.com Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Java Kafka Word Count Issue
First I would check your code to see how you are pushing records into the topic. Is it reading the whole file each time and resending all of it? Then see if you are using the same consumer.id on the Spark side. Otherwise you are not reading from the same offset when restarting Spark but instead reading from the default defined in Kafka by auto.offset.reset, which you may be setting to 'smallest'. This is why I think this is likely an issue with how you use Kafka. On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: java.lang.IllegalStateException: unread block data
I got the same problem, maybe java serializer is unstable -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p21463.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Graphx Spark] Error of Lost executor and TimeoutException
Thanks, Sonal. But it seems to be an error happened when “cleaning broadcast”? BTW, what is the timeout of “[30 seconds]”? can I increase it? Best, Yifan LI On 02 Feb 2015, at 11:12, Sonal Goyal sonalgoy...@gmail.com wrote: That may be the cause of your issue. Take a look at the tuning guide[1] and maybe also profile your application. See if you can reuse your objects. 1. http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Sat, Jan 31, 2015 at 4:21 AM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Yes, I think so, esp. for a pregel application… have any suggestion? Best, Yifan LI On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com mailto:sonalgoy...@gmail.com wrote: Is your code hitting frequent garbage collection? Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I am running my graphx application on Spark 1.2.0(11 nodes cluster), has requested 30GB memory per node and 100 cores for around 1GB input dataset(5 million vertices graph). But the error below always happen… Is there anyone could give me some points? (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph computation, and another version of my application can work well on the same dataset while it need much less memory during computation) Thanks in advance!!! 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.ContextCleaner.org http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) [Stage 91:=== (2 + 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 [Stage 93: (29 + 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote Akka client disassociated [Stage 83: (1 + 0) / 6][Stage 86: (0 + 1) / 2][Stage 88: (0 + 2) / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 [Stage 83:=== (5 + 1) / 6][Stage 88:= (9 + 2) / 11]15/01/29 23:57:30 ERROR TaskSchedulerImpl: Lost executor 8 on small10-tap1.common.lip6.fr http://small10-tap1.common.lip6.fr/: remote Akka client disassociated 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 15/01/29 23:57:30 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 8 Best, Yifan LI
Re: Java Kafka Word Count Issue
You can use updateStateByKey() to perform the above operation. On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -Sean Owen so...@cloudera.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: Sean Owen so...@cloudera.com Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: [Graphx Spark] Error of Lost executor and TimeoutException
I think this broadcast cleaning(memory block remove?) timeout exception was caused by: 15/02/02 11:48:49 ERROR TaskSchedulerImpl: Lost executor 13 on small18-tap1.common.lip6.fr: remote Akka client disassociated 15/02/02 11:48:49 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 13 15/02/02 11:48:49 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 13 Anyone has points on this? Best, Yifan LI On 02 Feb 2015, at 11:47, Yifan LI iamyifa...@gmail.com wrote: Thanks, Sonal. But it seems to be an error happened when “cleaning broadcast”? BTW, what is the timeout of “[30 seconds]”? can I increase it? Best, Yifan LI On 02 Feb 2015, at 11:12, Sonal Goyal sonalgoy...@gmail.com mailto:sonalgoy...@gmail.com wrote: That may be the cause of your issue. Take a look at the tuning guide[1] and maybe also profile your application. See if you can reuse your objects. 1. http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Sat, Jan 31, 2015 at 4:21 AM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Yes, I think so, esp. for a pregel application… have any suggestion? Best, Yifan LI On 30 Jan 2015, at 22:25, Sonal Goyal sonalgoy...@gmail.com mailto:sonalgoy...@gmail.com wrote: Is your code hitting frequent garbage collection? Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co/ http://in.linkedin.com/in/sonalgoyal On Fri, Jan 30, 2015 at 7:52 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I am running my graphx application on Spark 1.2.0(11 nodes cluster), has requested 30GB memory per node and 100 cores for around 1GB input dataset(5 million vertices graph). But the error below always happen… Is there anyone could give me some points? (BTW, the overall edge/vertex RDDs will reach more than 100GB during graph computation, and another version of my application can work well on the same dataset while it need much less memory during computation) Thanks in advance!!! 15/01/29 18:05:08 ERROR ContextCleaner: Error cleaning broadcast 60 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:227) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:138) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.ContextCleaner.org http://org.apache.spark.contextcleaner.org/$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) [Stage 91:=== (2 + 4) / 6]15/01/29 18:08:15 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 0 [Stage 93: (29 + 20) / 49]15/01/29 23:47:03 ERROR TaskSchedulerImpl: Lost executor 9 on small11-tap1.common.lip6.fr http://small11-tap1.common.lip6.fr/: remote Akka client disassociated [Stage 83: (1 + 0) / 6][Stage 86: (0 + 1) / 2][Stage 88: (0 + 2) / 8]15/01/29 23:47:06 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 9 [Stage 83:=== (5 + 1) / 6][Stage
Re: Spark impersonation
yes jobs run as the user that launched them. if you want to run jobs on a secure cluster then use yarn. hadoop standalone does not support secure hadoop. On Mon, Feb 2, 2015 at 5:37 PM, Jim Green openkbi...@gmail.com wrote: Hi Team, Does spark support impersonation? For example, when spark on yarn/hive/hbase/etc..., which user is used by default? The user which starts the spark job? Any suggestions related to impersonation? -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: Programmatic Spark 1.2.0 on EMR | S3 filesystem is not working when using
Alright.. I found the issue. I wasn't setting fs.s3.buffer.dir configuration. Here is the final spark conf snippet that works: spark.hadoop.fs.s3n.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem, spark.hadoop.fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem, spark.hadoop.fs.s3bfs.impl: org.apache.hadoop.fs.s3.S3FileSystem, spark.hadoop.fs.s3.buffer.dir: /mnt/var/lib/hadoop/s3,/mnt1/var/lib/hadoop/s3, spark.hadoop.fs.s3n.endpoint: s3.amazonaws.com, spark.hadoop.fs.emr.configuration.version: 1.0, spark.hadoop.fs.s3n.multipart.uploads.enabled: true, spark.hadoop.fs.s3.enableServerSideEncryption: false, spark.hadoop.fs.s3.serverSideEncryptionAlgorithm: AES256, spark.hadoop.fs.s3.consistent: true, spark.hadoop.fs.s3.consistent.retryPolicyType: exponential, spark.hadoop.fs.s3.consistent.retryPeriodSeconds: 10, spark.hadoop.fs.s3.consistent.retryCount: 5, spark.hadoop.fs.s3.maxRetries: 4, spark.hadoop.fs.s3.sleepTimeSeconds: 10, spark.hadoop.fs.s3.consistent.throwExceptionOnInconsistency: true, spark.hadoop.fs.s3.consistent.metadata.autoCreate: true, spark.hadoop.fs.s3.consistent.metadata.tableName: EmrFSMetadata, spark.hadoop.fs.s3.consistent.metadata.read.capacity: 500, spark.hadoop.fs.s3.consistent.metadata.write.capacity: 100, spark.hadoop.fs.s3.consistent.fastList: true, spark.hadoop.fs.s3.consistent.fastList.prefetchMetadata: false, spark.hadoop.fs.s3.consistent.notification.CloudWatch: false, spark.hadoop.fs.s3.consistent.notification.SQS: false Thanks, Aniket On Fri Jan 30 2015 at 23:29:25 Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Right. Which makes me to believe that the directory is perhaps configured somewhere and i have missed configuring the same. The process that is submitting jobs (basically becomes driver) is running in sudo mode and the executors are executed by YARN. The hadoop username is configured as 'hadoop' (default user in EMR). On Fri, Jan 30, 2015, 11:25 PM Sven Krasser kras...@gmail.com wrote: From your stacktrace it appears that the S3 writer tries to write the data to a temp file on the local file system first. Taking a guess, that local directory doesn't exist or you don't have permissions for it. -Sven On Fri, Jan 30, 2015 at 6:44 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am programmatically submit spark jobs in yarn-client mode on EMR. Whenever a job tries to save file to s3, it gives the below mentioned exception. I think the issue might be what EMR is not setup properly as I have to set all hadoop configurations manually in SparkContext. However, I am not sure which configuration am I missing (if any). Configurations that I am using in SparkContext to setup EMRFS: spark.hadoop.fs.s3n.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem, spark.hadoop.fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem, spark.hadoop.fs.emr.configuration.version: 1.0, spark.hadoop.fs.s3n.multipart.uploads.enabled: true, spark.hadoop.fs.s3.enableServerSideEncryption: false, spark.hadoop.fs.s3.serverSideEncryptionAlgorithm: AES256, spark.hadoop.fs.s3.consistent: true, spark.hadoop.fs.s3.consistent.retryPolicyType: exponential, spark.hadoop.fs.s3.consistent.retryPeriodSeconds: 10, spark.hadoop.fs.s3.consistent.retryCount: 5, spark.hadoop.fs.s3.maxRetries: 4, spark.hadoop.fs.s3.sleepTimeSeconds: 10, spark.hadoop.fs.s3.consistent.throwExceptionOnInconsistency: true, spark.hadoop.fs.s3.consistent.metadata.autoCreate: true, spark.hadoop.fs.s3.consistent.metadata.tableName: EmrFSMetadata, spark.hadoop.fs.s3.consistent.metadata.read.capacity: 500, spark.hadoop.fs.s3.consistent.metadata.write.capacity: 100, spark.hadoop.fs.s3.consistent.fastList: true, spark.hadoop.fs.s3.consistent.fastList.prefetchMetadata: false, spark.hadoop.fs.s3.consistent.notification.CloudWatch: false, spark.hadoop.fs.s3.consistent.notification.SQS: false, Exception: java.io.IOException: No such file or directory at java.io.UnixFileSystem.createFileExclusively(Native Method) at java.io.File.createNewFile(File.java:1006) at java.io.File.createTempFile(File.java:1989) at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.startNewTempFile( S3FSOutputStream.java:269) at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.writeInternal( S3FSOutputStream.java:205) at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.flush( S3FSOutputStream.java:136) at com.amazon.ws.emr.hadoop.fs.s3.S3FSOutputStream.close( S3FSOutputStream.java:156) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close( FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close( FSDataOutputStream.java:105) at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close( TextOutputFormat.java:109) at org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.close( MultipleOutputFormat.java:116) at org.apache.spark.SparkHadoopWriter.close(SparkHadoopWriter.scala:102) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13. apply(PairRDDFunctions.scala:1068) at
Re: GraphX: ShortestPaths does not terminate on a grid graph
On 01/29/2015 08:31 PM, Ankur Dave wrote: Thanks for the reminder. I just created a PR: https://github.com/apache/spark/pull/4273 Ankur Hello, Thanks for the patch. I applied it on Pregel.scala (in Spark 1.2.0 sources) and rebuilt Spark. During execution, at the 25th iteration of Pregel, checkpointing is done and then it throws the following exception : Exception in thread main org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[521] at reduce at VertexRDDImpl.scala:80(0) has different number of partitions than original RDD VertexRDD ZippedPartitionsRDD2[518] at zipPartitions at VertexRDDImpl.scala:170(2) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:98) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1279) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1281) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1281) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1281) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1285) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1351) at org.apache.spark.rdd.RDD.reduce(RDD.scala:867) at org.apache.spark.graphx.impl.VertexRDDImpl.count(VertexRDDImpl.scala:80) at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:155) at org.apache.spark.graphx.lib.ShortestPaths$.run(ShortestPaths.scala:69) Pregel.scala:155 is the following line in the pregel loop: activeMessages = messages.count() - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
2GB limit for partitions?
Greetings! SPARK-1476 says that there is a 2G limit for blocks.Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following.1) Start with a moderately large data set (currently about 100GB, but growing).2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this.1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file.2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert
Is there a way to disable the Spark UI?
Hi All, Is there a way to disable the Spark UI? What I really need is to stop the startup of the Jetty server. -- Thanks regards, Nirmal Senior Software Engineer- Platform Technologies Team, WSO2 Inc. Mobile: +94715779733 Blog: http://nirmalfdo.blogspot.com/
Re: Spark impersonation
I think you can configure hadoop/hive to do impersonation. There is no difference between secure or insecure hadoop cluster by using kinit. Thanks. Zhan Zhang On Feb 2, 2015, at 9:32 PM, Koert Kuipers ko...@tresata.commailto:ko...@tresata.com wrote: yes jobs run as the user that launched them. if you want to run jobs on a secure cluster then use yarn. hadoop standalone does not support secure hadoop. On Mon, Feb 2, 2015 at 5:37 PM, Jim Green openkbi...@gmail.commailto:openkbi...@gmail.com wrote: Hi Team, Does spark support impersonation? For example, when spark on yarn/hive/hbase/etc..., which user is used by default? The user which starts the spark job? Any suggestions related to impersonation? -- Thanks, www.openkb.infohttp://www.openkb.info/ (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Scala on Spark functions examples cheatsheet.
Hi Team, I just spent some time these 2 weeks on Scala and tried all Scala on Spark functions in the Spark Programming Guide http://spark.apache.org/docs/1.2.0/programming-guide.html. If you need example codes of Scala on Spark functions, I created this cheat sheet http://www.openkb.info/2015/01/scala-on-spark-cheatsheet.htmlwith examples. Sharing. -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: Error when Spark streaming consumes from Kafka
We're planning to use this as well (Dibyendu's https://github.com/dibbhatt/kafka-spark-consumer ). Dibyendu, thanks for the efforts. So far its working nicely. I think there is merit in make it the default Kafka Receiver for spark streaming. -neelesh On Mon, Feb 2, 2015 at 5:25 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Or you can use this Low Level Kafka Consumer for Spark : https://github.com/dibbhatt/kafka-spark-consumer This is now part of http://spark-packages.org/ and is running successfully for past few months in Pearson production environment . Being Low Level consumer, it does not have this re-balancing issue which High Level consumer have. Also I know there are few who has shifted to this Low Level Consumer which started giving them a better robust fault tolerant Kafka Receiver for Spark. Regards, Dibyendu On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This is an issue that is hard to resolve without rearchitecting the whole Kafka Receiver. There are some workarounds worth looking into. http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote: Hi, This seems not fixed yet. I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505 Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 2GB limit for partitions?
The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when Spark streaming consumes from Kafka
Thanks Neelesh . Glad to know this Low Level Consumer is working for you. Dibyendu On Tue, Feb 3, 2015 at 8:06 AM, Neelesh neele...@gmail.com wrote: We're planning to use this as well (Dibyendu's https://github.com/dibbhatt/kafka-spark-consumer ). Dibyendu, thanks for the efforts. So far its working nicely. I think there is merit in make it the default Kafka Receiver for spark streaming. -neelesh On Mon, Feb 2, 2015 at 5:25 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Or you can use this Low Level Kafka Consumer for Spark : https://github.com/dibbhatt/kafka-spark-consumer This is now part of http://spark-packages.org/ and is running successfully for past few months in Pearson production environment . Being Low Level consumer, it does not have this re-balancing issue which High Level consumer have. Also I know there are few who has shifted to this Low Level Consumer which started giving them a better robust fault tolerant Kafka Receiver for Spark. Regards, Dibyendu On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This is an issue that is hard to resolve without rearchitecting the whole Kafka Receiver. There are some workarounds worth looking into. http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko s...@dicefield.com wrote: Hi, This seems not fixed yet. I filed an issue in jira: https://issues.apache.org/jira/browse/SPARK-5505 Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Window comparison matching using the sliding window functionality: feasibility
Mine was not really a moving average problem. It was more like partitioning on some keys and sorting(on different keys) and then running a sliding window through the partition. I reverted back to map-reduce for that(I needed secondary sort, which is not very mature in Spark right now). But, as far as I understand your problem, you should be able to handle it by converting your RDD to key-value RDDs which I think will be automatically partitioned on the key and then use *mapPartitions *to run your logic. On Mon, Feb 2, 2015 at 1:20 AM, ashu [via Apache Spark User List] ml-node+s1001560n21458...@n3.nabble.com wrote: Hi, I want to know about your moving avg implementation. I am also doing some time-series analysis about CPU performance. So I tried simple regression but result is not good. rmse is 10 but when I extrapolate it just shoot up linearly. I think I should first smoothed out the data then try regression to forecast. i am thinking of moving avg as an option,tried it out according to this http://stackoverflow.com/questions/23402303/apache-spark-moving-average but partitionBy is giving me error, I am building with Spark 1.2.0. Can you share your ARIMA implementation if it is open source, else can you give me hints about it Will really appreciate the help. Thanks -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Window-comparison-matching-using-the-sliding-window-functionality-feasibility-tp15352p21458.html To unsubscribe from Window comparison matching using the sliding window functionality: feasibility, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=15352code=bml0aW5rYWswMDFAZ21haWwuY29tfDE1MzUyfDEyMjcwMjA2NQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Window-comparison-matching-using-the-sliding-window-functionality-feasibility-tp15352p21467.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark streaming - tracking/deleting processed files
You can utilize the following method: https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 It has a parameter: newFilesOnly - Should process only new files and ignore existing files in the directory And it works as expected. -- Emre Sevinç On Fri, Jan 30, 2015 at 7:07 PM, ganterm gant...@gmail.com wrote: We are running a Spark streaming job that retrieves files from a directory (using textFileStream). One concern we are having is the case where the job is down but files are still being added to the directory. Once the job starts up again, those files are not being picked up (since they are not new or changed while the job is running) but we would like them to be processed. Is there a solution for that? Is there a way to keep track what files have been processed and can we force older files to be picked up? Is there a way to delete the processed files? Thanks! Markus -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Re: [hive context] Unable to query array once saved as parquet
Hi, given the current open issue: https://issues.apache.org/jira/browse/SPARK-5508 I cannot use HiveQL to insert schemaRDD data into a table if one of the columns is an Array of Struct. using the spark API, Is it possible to insert schema RDD into an existing and *partitioned* table ? the method insertInto on schema RDD does take only the name of the table. Thanks, Ayoub. 2015-01-31 22:30 GMT+01:00 Ayoub Benali benali.ayoub.i...@gmail.com: Hello, as asked, I just filled this JIRA issue https://issues.apache.org/jira/browse/SPARK-5508. I will add an other similar code example which lead to GenericRow cannot be cast to org.apache.spark.sql.catalyst.expressions.SpecificMutableRow Exception. Best, Ayoub. 2015-01-31 4:05 GMT+01:00 Cheng Lian lian.cs@gmail.com: According to the Gist Ayoub provided, the schema is fine. I reproduced this issue locally, it should be bug, but I don't think it's related to SPARK-5236. Will investigate this soon. Ayoub - would you mind to help to file a JIRA for this issue? Thanks! Cheng On 1/30/15 11:28 AM, Michael Armbrust wrote: Is it possible that your schema contains duplicate columns or column with spaces in the name? The parquet library will often give confusing error messages in this case. On Fri, Jan 30, 2015 at 10:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: Hello, I have a problem when querying, with a hive context on spark 1.2.1-snapshot, a column in my table which is nested data structure like an array of struct. The problems happens only on the table stored as parquet, while querying the Schema RDD saved, as a temporary table, don't lead to any exception. my steps are: 1) reading JSON file 2) creating a schema RDD and saving it as a tmp table 3) creating an external table in hive meta store saved as parquet file 4) inserting the data from the tmp table to the persisted table 5) queering the persisted table lead to this exception: select data.field1 from persisted_table LATERAL VIEW explode(data_array) nestedStuff AS data parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://***/test_table/part-1 at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:145) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at http://scala.collection.AbstractIterator.to scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:99) at parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:99) at parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:94) at
Re: Is there a way to disable the Spark UI?
Thanks Zhan! Was this introduced from Spark 1.2? or is this available in Spark 1.1 ? On Tue, Feb 3, 2015 at 11:52 AM, Zhan Zhang zzh...@hortonworks.com wrote: You can set spark.ui.enabled to false to disable ui. Thanks. Zhan Zhang On Feb 2, 2015, at 8:06 PM, Nirmal Fernando nir...@wso2.com wrote: Hi All, Is there a way to disable the Spark UI? What I really need is to stop the startup of the Jetty server. -- Thanks regards, Nirmal Senior Software Engineer- Platform Technologies Team, WSO2 Inc. Mobile: +94715779733 Blog: http://nirmalfdo.blogspot.com/ -- Thanks regards, Nirmal Senior Software Engineer- Platform Technologies Team, WSO2 Inc. Mobile: +94715779733 Blog: http://nirmalfdo.blogspot.com/
Re: Java Kafka Word Count Issue
Hi, I added checkpoint directory and now Using updateStateByKey() import com.google.common.base.Optional; Function2ListInteger, OptionalInteger, OptionalInteger updateFunction = new Function2ListInteger, OptionalInteger, OptionalInteger() { @Override public OptionalInteger call(ListInteger values, OptionalInteger state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); } }; JavaPairDStreamString, Integer runningCounts = pairs.updateStateByKey(updateFunction); But I didn't get what exactly I should assign in Integer newSum = ... // add the new values with the previous running count to get the new count Thanks and regards Shweta Jadhav -VISHNU SUBRAMANIAN johnfedrickena...@gmail.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: VISHNU SUBRAMANIAN johnfedrickena...@gmail.com Date: 02/02/2015 04:39PM Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Java Kafka Word Count Issue You can use updateStateByKey() to perform the above operation. On Mon, Feb 2, 2015 at 4:29 PM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi Sean, Kafka Producer is working fine. This is related to Spark. How can i configure spark so that it will make sure to remember count from the beginning. If my log.text file has spark apache kafka spark My Spark program gives correct output as spark 2 apache 1 kafka 1 but when I append spark to my log.text file Spark program gives output as spark 1 which should be spark 3. So how to handle this in Spark code. Thanks and regards Shweta Jadhav -Sean Owen so...@cloudera.com wrote: - To: Jadhav Shweta jadhav.shw...@tcs.com From: Sean Owen so...@cloudera.com Date: 02/02/2015 04:13PM Subject: Re: Java Kafka Word Count Issue This is a question about the Kafka producer right? Not Spark On Feb 2, 2015 10:34 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: Hi All, I am trying to run Kafka Word Count Program. please find below, the link for the same https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I have set spark master to setMaster(local[*]) and I have started Kafka Producer which reads the file. If my file has already few words then after running Spark java program I get proper output. But when i append new words in same file it starts word count again from 1. If I need to do word count for already present and newly appended words exactly what changes I need to make in code for that. P.S. I am using Spark spark-1.2.0-bin-hadoop2.3 Thanks and regards Shweta Jadhav =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Loading status
I am not sure what Loading status means, followed by Running. In the application UI, I see: Executor Summary ExecutorID Worker Cores Memory State Logs 1 worker-20150202144112-hadoop-w-1.c.fi-mdd-poc.internal-3887416 83971 LOADING stdout stderr 0 worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-5868516 83971 RUNNING stdout stderr Looking at the executor hadoop-w-2, I see the status is Loading . Why different statuses, and what does that mean? Please see below for details: ID: worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 Master URL: spark://hadoop-m:7077 Cores: 16 (16 Used) Memory: 82.0 GB (82.0 GB Used) Back to Master Running Executors (1) ExecutorID Cores State Memory Job Details Logs 0 16 LOADING 82.0 GB ID: app-20150202152154-0001 Name: Simple File Merge Application User: hadoop stdout stderr Thank you, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-status-tp21468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading status
Yes On Monday, February 2, 2015, Mark Hamstra m...@clearstorydata.com wrote: LOADING is just the state in which new Executors are created but before they have everything they need and are fully registered to transition to state RUNNING and begin doing actual work: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L351 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala#L133 LOADING should be a fairly brief, transitory state. Are you seeing Executors remaining in LOADING for a significant length of time? On Mon, Feb 2, 2015 at 7:56 AM, akhandeshi ami.khande...@gmail.com javascript:_e(%7B%7D,'cvml','ami.khande...@gmail.com'); wrote: I am not sure what Loading status means, followed by Running. In the application UI, I see: Executor Summary ExecutorID Worker Cores Memory State Logs 1 worker-20150202144112-hadoop-w-1.c.fi-mdd-poc.internal-38874 16 83971 LOADING stdout stderr 0 worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 16 83971 RUNNING stdout stderr Looking at the executor hadoop-w-2, I see the status is Loading . Why different statuses, and what does that mean? Please see below for details: ID: worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 Master URL: spark://hadoop-m:7077 Cores: 16 (16 Used) Memory: 82.0 GB (82.0 GB Used) Back to Master Running Executors (1) ExecutorID Cores State Memory Job Details Logs 0 16 LOADING 82.0 GB ID: app-20150202152154-0001 Name: Simple File Merge Application User: hadoop stdout stderr Thank you, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-status-tp21468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org'); For additional commands, e-mail: user-h...@spark.apache.org javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');
Re: Loading status
LOADING is just the state in which new Executors are created but before they have everything they need and are fully registered to transition to state RUNNING and begin doing actual work: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L351 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala#L133 LOADING should be a fairly brief, transitory state. Are you seeing Executors remaining in LOADING for a significant length of time? On Mon, Feb 2, 2015 at 7:56 AM, akhandeshi ami.khande...@gmail.com wrote: I am not sure what Loading status means, followed by Running. In the application UI, I see: Executor Summary ExecutorID Worker Cores Memory State Logs 1 worker-20150202144112-hadoop-w-1.c.fi-mdd-poc.internal-38874 16 83971 LOADING stdout stderr 0 worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 16 83971 RUNNING stdout stderr Looking at the executor hadoop-w-2, I see the status is Loading . Why different statuses, and what does that mean? Please see below for details: ID: worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 Master URL: spark://hadoop-m:7077 Cores: 16 (16 Used) Memory: 82.0 GB (82.0 GB Used) Back to Master Running Executors (1) ExecutorID Cores State Memory Job Details Logs 0 16 LOADING 82.0 GB ID: app-20150202152154-0001 Name: Simple File Merge Application User: hadoop stdout stderr Thank you, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-status-tp21468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can't find spark-parent when using snapshot build
Hi all, I'm trying to use the master version of spark. I build and install it with $ mvn clean clean install I manage to use it with the following configuration in my build.sbt : *libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.3.0-SNAPSHOT % provided, org.apache.spark %% spark-sql % 1.3.0-SNAPSHOT % provided , org.apache.spark %% spark-mllib % 1.3.0-SNAPSHOT % provided)* But After my last update I got the following error : *unresolved dependency: org.apache.spark#spark-mllib_2.10;1.3.0-SNAPSHOT: Maven2 Local: no ivy file nor artifact found for org.apache.spark#spark-parent;1.3.0-SNAPSHOT* Any ideas ? Cheers, Jao
Re: Loading status
It seems sort of Listener UI error! I say this because, I see the status in the executor web UI to be loading, but the application UI, for the same executor the status is Running! I have also seen the reverse behavior where the application UI indicates a particular executor as loading, but the executor UI page indicates it is running! There are no error messages in the logs that I have been able to spot! Also, logs seems to indicate it is doing the work.. As far as reproducibility goes, it is hard, due to its inconsistent behavior observed...perhaps, again indicating some listener issue... will update as I find any further information... thank you both for help! On Monday, February 2, 2015, Mark Hamstra m...@clearstorydata.com wrote: Yes, if the Master is unable to register the Executor and transition it to RUNNING, then the Executor will stay in LOADING state, so this can be caused by problems in the Master or the Master-Executor communication. On Mon, Feb 2, 2015 at 9:24 AM, Tushar Sharma tushars...@gmail.com javascript:_e(%7B%7D,'cvml','tushars...@gmail.com'); wrote: Yes curious indeed. Usually if such status persists it leads to executor failure after a couple of tries. We also noticed similar behavior but the logs clearly mentioned the lack of some akka resource as the reason. I am sure the logs also holds the key for this particular problem. On Mon, Feb 2, 2015 at 10:49 PM, Mark Hamstra m...@clearstorydata.com javascript:_e(%7B%7D,'cvml','m...@clearstorydata.com'); wrote: Curious. I guess the first question is whether we've got some sort of Listener/UI error so that the UI is not accurately reflecting the Executor's actual state, or whether the LOADING Executor really is spending a considerable length of time in this I'm in the process of being created, but not yet doing anything useful state. If you can figure out a little more of what is going on or how to reproduce this state, please do file a JIRA. On Mon, Feb 2, 2015 at 8:28 AM, Ami Khandeshi ami.khande...@gmail.com javascript:_e(%7B%7D,'cvml','ami.khande...@gmail.com'); wrote: Yes On Monday, February 2, 2015, Mark Hamstra m...@clearstorydata.com javascript:_e(%7B%7D,'cvml','m...@clearstorydata.com'); wrote: LOADING is just the state in which new Executors are created but before they have everything they need and are fully registered to transition to state RUNNING and begin doing actual work: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L351 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala#L133 LOADING should be a fairly brief, transitory state. Are you seeing Executors remaining in LOADING for a significant length of time? On Mon, Feb 2, 2015 at 7:56 AM, akhandeshi ami.khande...@gmail.com wrote: I am not sure what Loading status means, followed by Running. In the application UI, I see: Executor Summary ExecutorID Worker Cores Memory State Logs 1 worker-20150202144112-hadoop-w-1.c.fi-mdd-poc.internal-38874 16 83971 LOADING stdout stderr 0 worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 16 83971 RUNNING stdout stderr Looking at the executor hadoop-w-2, I see the status is Loading . Why different statuses, and what does that mean? Please see below for details: ID: worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 Master URL: spark://hadoop-m:7077 Cores: 16 (16 Used) Memory: 82.0 GB (82.0 GB Used) Back to Master Running Executors (1) ExecutorID Cores State Memory Job Details Logs 0 16 LOADING 82.0 GB ID: app-20150202152154-0001 Name: Simple File Merge Application User: hadoop stdout stderr Thank you, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-status-tp21468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to send JavaDStream RDD using foreachRDD using Java
Here you go: JavaDStreamString textStream = ssc.textFileStream(/home/akhld/sigmoid/); textStream.foreachRDD(new FunctionJavaRDDString,Void() { @Override public Void call(JavaRDDString rdd) throws Exception { // TODO Auto-generated method stub rdd.foreach(new VoidFunctionString(){ @Override public void call(String stringData) throws Exception { // Use this data! System.out.println(W00t!! Data : + stringData); } }); return null; } }); Thanks Best Regards On Sun, Feb 1, 2015 at 9:08 PM, sachin Singh sachin.sha...@gmail.com wrote: Hi I want to send streaming data to kafka topic, I am having RDD data which I converted in JavaDStream ,now I want to send it to kafka topic, I don't want kafka sending code, just I need foreachRDD implementation, my code is look like as public void publishtoKafka(ITblStream t) { MyTopicProducer MTP = ProducerFactory.createProducer(hostname+:+port); JavaDStream? rdd = (JavaDStream?) t.getRDD(); rdd.foreachRDD(new FunctionString, String() { @Override public Void call(JavaRDDString rdd) throws Exception { KafkaUtils.sendDataAsString(MTP,topicName, String RDDData); return null; } }); log.debug(sent to kafka: --); } here myTopicproducer will create producer which is working fine KafkaUtils.sendDataAsString is method which will publish data to kafka topic is also working fine, I have only one problem I am not able to convert JavaDStream rdd as string using foreach or foreachRDD finally I need String message from rdds, kindly suggest java code only and I dont want to use anonymous classes, Please send me only the part to send JavaDStream RDD using foreachRDD using Function Call Thanks in advance, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-send-JavaDStream-RDD-using-foreachRDD-using-Java-tp21456.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading status
Yes, if the Master is unable to register the Executor and transition it to RUNNING, then the Executor will stay in LOADING state, so this can be caused by problems in the Master or the Master-Executor communication. On Mon, Feb 2, 2015 at 9:24 AM, Tushar Sharma tushars...@gmail.com wrote: Yes curious indeed. Usually if such status persists it leads to executor failure after a couple of tries. We also noticed similar behavior but the logs clearly mentioned the lack of some akka resource as the reason. I am sure the logs also holds the key for this particular problem. On Mon, Feb 2, 2015 at 10:49 PM, Mark Hamstra m...@clearstorydata.com wrote: Curious. I guess the first question is whether we've got some sort of Listener/UI error so that the UI is not accurately reflecting the Executor's actual state, or whether the LOADING Executor really is spending a considerable length of time in this I'm in the process of being created, but not yet doing anything useful state. If you can figure out a little more of what is going on or how to reproduce this state, please do file a JIRA. On Mon, Feb 2, 2015 at 8:28 AM, Ami Khandeshi ami.khande...@gmail.com wrote: Yes On Monday, February 2, 2015, Mark Hamstra m...@clearstorydata.com wrote: LOADING is just the state in which new Executors are created but before they have everything they need and are fully registered to transition to state RUNNING and begin doing actual work: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L351 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala#L133 LOADING should be a fairly brief, transitory state. Are you seeing Executors remaining in LOADING for a significant length of time? On Mon, Feb 2, 2015 at 7:56 AM, akhandeshi ami.khande...@gmail.com wrote: I am not sure what Loading status means, followed by Running. In the application UI, I see: Executor Summary ExecutorID Worker Cores Memory State Logs 1 worker-20150202144112-hadoop-w-1.c.fi-mdd-poc.internal-38874 16 83971 LOADING stdout stderr 0 worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 16 83971 RUNNING stdout stderr Looking at the executor hadoop-w-2, I see the status is Loading . Why different statuses, and what does that mean? Please see below for details: ID: worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 Master URL: spark://hadoop-m:7077 Cores: 16 (16 Used) Memory: 82.0 GB (82.0 GB Used) Back to Master Running Executors (1) ExecutorID Cores State Memory Job Details Logs 0 16 LOADING 82.0 GB ID: app-20150202152154-0001 Name: Simple File Merge Application User: hadoop stdout stderr Thank you, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-status-tp21468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading status
Curious. I guess the first question is whether we've got some sort of Listener/UI error so that the UI is not accurately reflecting the Executor's actual state, or whether the LOADING Executor really is spending a considerable length of time in this I'm in the process of being created, but not yet doing anything useful state. If you can figure out a little more of what is going on or how to reproduce this state, please do file a JIRA. On Mon, Feb 2, 2015 at 8:28 AM, Ami Khandeshi ami.khande...@gmail.com wrote: Yes On Monday, February 2, 2015, Mark Hamstra m...@clearstorydata.com wrote: LOADING is just the state in which new Executors are created but before they have everything they need and are fully registered to transition to state RUNNING and begin doing actual work: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L351 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala#L133 LOADING should be a fairly brief, transitory state. Are you seeing Executors remaining in LOADING for a significant length of time? On Mon, Feb 2, 2015 at 7:56 AM, akhandeshi ami.khande...@gmail.com wrote: I am not sure what Loading status means, followed by Running. In the application UI, I see: Executor Summary ExecutorID Worker Cores Memory State Logs 1 worker-20150202144112-hadoop-w-1.c.fi-mdd-poc.internal-38874 16 83971 LOADING stdout stderr 0 worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 16 83971 RUNNING stdout stderr Looking at the executor hadoop-w-2, I see the status is Loading . Why different statuses, and what does that mean? Please see below for details: ID: worker-20150202144112-hadoop-w-2.c.fi-mdd-poc.internal-58685 Master URL: spark://hadoop-m:7077 Cores: 16 (16 Used) Memory: 82.0 GB (82.0 GB Used) Back to Master Running Executors (1) ExecutorID Cores State Memory Job Details Logs 0 16 LOADING 82.0 GB ID: app-20150202152154-0001 Name: Simple File Merge Application User: hadoop stdout stderr Thank you, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-status-tp21468.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to define a file filter for file name patterns in Apache Spark Streaming in Java?
Hi Emre, This is how you do that in scala: val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](/home/akhld/sigmoid, (t: Path) = true, true) In java you can do something like: jssc.ssc().LongWritable, Text, SequenceFileInputFormatfileStream(/home/akhld/sigmoid, new AbstractFunction1Path, Object() { @Override public Boolean apply(Path input) { //file filtering logic here return true; } }, true, ClassTag$.MODULE$.apply(LongWritable.class), ClassTag$.MODULE$.apply(Text.class), ClassTag$.MODULE$.apply(SequenceFileInputFormat.class)); Thanks Best Regards On Mon, Feb 2, 2015 at 6:34 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter for file names when creating an InputDStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html by invoking the fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method. My code is working perfectly fine when I don't use a file filter, e.g. by invoking the other fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method (described here https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29: https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 ). According to the documentation of *fileStream* method, I can pass it scala.Function1org.apache.hadoop.fs.Path,Object filter But so far, I could not create a fileFilter. My initial attempts have been 1- Tried to implement it as: Function1Path, Object fileFilter = new Function1Path, Object() { @Override public Object apply(Path v1) { return true; } @Override public A Function1A, Object compose(Function1A, Path g) { return Function1$class.compose(this, g); } @Override public A Function1Path, A andThen(Function1Object, A g) { return Function1$class.andThen(this, g); } }; But apparently my implementation of andThen is wrong, and I couldn't understand how I should implement it. It complains that the anonymous function: is not abstract and does not override abstract method AandThen$mcVJ$sp(scala.Function1scala.runtime.BoxedUnit,A) in scala.Function1 2- Tried to implement it as: Function1Path, Object fileFilter = new AbstractFunction1Path, Object() { @Override public Object apply(Path v1) { return true; } }; This one compiles, but when I run it I get an exception: 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1 java.io.NotSerializableException: myModule$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at
Re: Is pair rdd join more efficient than regular rdd
Yes it would, you can create a key and then partition it (say HashPartitioner) and then joining would be faster as all the similar keys will go in one partition. Thanks Best Regards On Sun, Feb 1, 2015 at 5:13 PM, Sunita Arvind sunitarv...@gmail.com wrote: Hi All We are joining large tables using spark sql and running into shuffle issues. We have explored multiple options - using coalesce to reduce number of partitions, tuning various parameters like disk buffer, reducing data in chunks etc. which all seem to help btw. What I would like to know is, is having a pair rdd over regular rdd one of the solutions ? Will it make the joining more efficient as spark can shuffle better since it knows the key? Logically speaking I think it should help but I haven't found any evidence on the internet including the spark sql documentation. It is a lot of effort for us to try this approach and weight the performance as we need to register the output as tables to proceed using them. Hence would appreciate inputs from the community before proceeding. Regards Sunita Koppar
Questions about Spark standalone resource scheduler
Hi all, I have some questions about the future development of Spark's standalone resource scheduler. We've heard some users have the requirements to have multi-tenant support in standalone mode, like multi-user management, resource management and isolation, whitelist of users. Seems current Spark standalone do not support such kind of functionalities, while resource schedulers like Yarn offers such kind of advanced managements, I'm not sure what's the future target of standalone resource scheduler, will it only target on simple implementation, and for advanced usage shift to YARN? Or will it plan to add some simple multi-tenant related functionalities? Thanks a lot for your comments. BR Jerry
Re: Questions about Spark standalone resource scheduler
Hey Jerry, I think standalone mode will still add more features over time, but the goal isn't really for it to become equivalent to what Mesos/YARN are today. Or at least, I doubt Spark Standalone will ever attempt to manage _other_ frameworks outside of Spark and become a general purpose resource manager. In terms of having better support for multi tenancy, meaning multiple *Spark* instances, this is something I think could be in scope in the future. For instance, we added H/A to the standalone scheduler a while back, because it let us support H/A streaming apps in a totally native way. It's a trade off of adding new features and keeping the scheduler very simple and easy to use. We've tended to bias towards simplicity as the main goal, since this is something we want to be really easy out of the box. One thing to point out, a lot of people use the standalone mode with some coarser grained scheduler, such as running in a cloud service. In this case they really just want a simple inner cluster manager. This may even be the majority of all Spark installations. This is slightly different than Hadoop environments, where they might just want nice integration into the existing Hadoop stack via something like YARN. - Patrick On Mon, Feb 2, 2015 at 12:24 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi all, I have some questions about the future development of Spark's standalone resource scheduler. We've heard some users have the requirements to have multi-tenant support in standalone mode, like multi-user management, resource management and isolation, whitelist of users. Seems current Spark standalone do not support such kind of functionalities, while resource schedulers like Yarn offers such kind of advanced managements, I'm not sure what's the future target of standalone resource scheduler, will it only target on simple implementation, and for advanced usage shift to YARN? Or will it plan to add some simple multi-tenant related functionalities? Thanks a lot for your comments. BR Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Questions about Spark standalone resource scheduler
Hi Patrick, Thanks a lot for your detailed explanation. For now we have such requirements: whitelist the application submitter, user resources (CPU, MEMORY) quotas, resources allocations in Spark Standalone mode. These are quite specific requirements for production-use, generally these problem will become whether we need to offer a more advanced resource scheduler compared to current simple FIFO one. I think our aim is to not provide a general resource scheduler like Mesos/Yarn, we only support Spark, but we hope to add some Mesos/Yarn functionalities to better use of Spark standalone mode. I admitted that resource scheduler may have some overlaps with cloud manager, whether to offer a powerful scheduler or use cloud manager is really a dilemma. I think we can break down to some small features to improve the standalone mode. What's your opinion? Thanks Jerry -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Monday, February 2, 2015 4:49 PM To: Shao, Saisai Cc: d...@spark.apache.org; user@spark.apache.org Subject: Re: Questions about Spark standalone resource scheduler Hey Jerry, I think standalone mode will still add more features over time, but the goal isn't really for it to become equivalent to what Mesos/YARN are today. Or at least, I doubt Spark Standalone will ever attempt to manage _other_ frameworks outside of Spark and become a general purpose resource manager. In terms of having better support for multi tenancy, meaning multiple *Spark* instances, this is something I think could be in scope in the future. For instance, we added H/A to the standalone scheduler a while back, because it let us support H/A streaming apps in a totally native way. It's a trade off of adding new features and keeping the scheduler very simple and easy to use. We've tended to bias towards simplicity as the main goal, since this is something we want to be really easy out of the box. One thing to point out, a lot of people use the standalone mode with some coarser grained scheduler, such as running in a cloud service. In this case they really just want a simple inner cluster manager. This may even be the majority of all Spark installations. This is slightly different than Hadoop environments, where they might just want nice integration into the existing Hadoop stack via something like YARN. - Patrick On Mon, Feb 2, 2015 at 12:24 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi all, I have some questions about the future development of Spark's standalone resource scheduler. We've heard some users have the requirements to have multi-tenant support in standalone mode, like multi-user management, resource management and isolation, whitelist of users. Seems current Spark standalone do not support such kind of functionalities, while resource schedulers like Yarn offers such kind of advanced managements, I'm not sure what's the future target of standalone resource scheduler, will it only target on simple implementation, and for advanced usage shift to YARN? Or will it plan to add some simple multi-tenant related functionalities? Thanks a lot for your comments. BR Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org