Looks like a version incompatibility, just make sure you have the proper version of spark. Also look further in the stacktrace what is causing Futures timed out (it could be a network issue also if the ports aren't opened properly)
Thanks Best Regards On Sat, May 2, 2015 at 12:04 AM, javidelgadillo <jdelgadi...@esri.com> wrote: > Hello all!! > > We've been prototyping some spark applications to read messages from Kafka > topics. The application is quite simple, we use KafkaUtils.createStream to > receive a stream of CSV messages from a Kafka Topic. We parse the CSV and > count the number of messages we get in each RDD. At a high-level (removing > the abstractions of our appliction), it looks like this: > > val sc = new SparkConf() > .setAppName(appName) > .set("spark.executor.memory", "1024m") > .set("spark.cores.max", "3") > .set("spark.app.name", appName) > .set("spark.ui.port", sparkUIPort) > > val ssc = new StreamingContext(sc, Milliseconds(emitInterval.toInt)) > > KafkaUtils > .createStream(ssc, zookeeperQuorum, consumerGroup, topicMap) > .map(_._2) > .foreachRDD( (rdd:RDD, time: Time) => { > println("Time %s: (%s total records)".format(time, rdd.count())) > } > > When I submit this using to spark master as local[3] everything behaves as > I'd expect. After some startup overhead, I'm seeing the count printed to > be > the same as the count I'm simulating (1 every second for example). > > When I submit this to a spark master using spark://master.host:7077, the > behavior is different. The overhead go start receiving seems longer and > some runs I don't see anything for 30 seconds even though my simulator is > sending messages to the topic. I also see the following error written to > stderr by every executor assigned to the job: > > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 15/05/01 10:11:38 INFO SecurityManager: Changing view acls to: username > 15/05/01 10:11:38 INFO SecurityManager: Changing modify acls to: username > 15/05/01 10:11:38 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(javi4211); > users with modify permissions: Set(username) > 15/05/01 10:11:38 INFO Slf4jLogger: Slf4jLogger started > 15/05/01 10:11:38 INFO Remoting: Starting remoting > 15/05/01 10:11:39 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://driverpropsfetc...@master.host:56534] > 15/05/01 10:11:39 INFO Utils: Successfully started service > 'driverPropsFetcher' on port 56534. > 15/05/01 10:11:40 WARN Remoting: Tried to associate with unreachable remote > address [akka.tcp://sparkdri...@driver.host:51837]. Address is now gated > for > 5000 ms, all messages to this address will be delivered to dead letters. > Reason: Connection refused: no further information: > driver.host/10.27.51.214:51837 > 15/05/01 10:12:09 ERROR UserGroupInformation: PriviledgedActionException > as:username cause:java.util.concurrent.TimeoutException: Futures timed out > after [30 seconds] > Exception in thread "main" java.lang.reflect.UndeclaredThrowableException: > Unknown exception in doAs > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134) > at > > org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) > at > > org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128) > at > > org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224) > at > > org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) > Caused by: java.security.PrivilegedActionException: > java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) > ... 4 more > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [30 seconds] > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > > > Is there something else I need to do configure to ensure akka remoting will > work correctly when running spark cluster? Or can I ignore this error? > > -Javier > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Remoting-warning-when-submitting-to-cluster-tp22733.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >