Looks like a version incompatibility, just make sure you have the proper
version of spark. Also look further in the stacktrace what is causing
Futures timed out (it could be a network issue also if the ports aren't
opened properly)

Thanks
Best Regards

On Sat, May 2, 2015 at 12:04 AM, javidelgadillo <jdelgadi...@esri.com>
wrote:

> Hello all!!
>
> We've been prototyping some spark applications to read messages from Kafka
> topics.  The application is quite simple, we use KafkaUtils.createStream to
> receive a stream of CSV messages from a Kafka Topic.  We parse the CSV and
> count the number of messages we get in each RDD. At a high-level (removing
> the abstractions of our appliction), it looks like this:
>
> val sc = new SparkConf()
>       .setAppName(appName)
>       .set("spark.executor.memory", "1024m")
>       .set("spark.cores.max", "3")
>       .set("spark.app.name", appName)
>       .set("spark.ui.port", sparkUIPort)
>
>  val ssc =  new StreamingContext(sc, Milliseconds(emitInterval.toInt))
>
> KafkaUtils
>       .createStream(ssc, zookeeperQuorum, consumerGroup, topicMap)
>       .map(_._2)
>       .foreachRDD( (rdd:RDD, time: Time) => {
>         println("Time %s: (%s total records)".format(time, rdd.count()))
>       }
>
> When I submit this using to spark master as local[3] everything behaves as
> I'd expect.  After some startup overhead, I'm seeing the count printed to
> be
> the same as the count I'm simulating  (1 every second for example).
>
> When I submit this to a spark master using spark://master.host:7077, the
> behavior is different.  The overhead go start receiving seems longer and
> some runs I don't see anything for 30 seconds even though my simulator is
> sending messages to the topic.  I also see the following error written to
> stderr by every executor assigned to the job:
>
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 15/05/01 10:11:38 INFO SecurityManager: Changing view acls to: username
> 15/05/01 10:11:38 INFO SecurityManager: Changing modify acls to: username
> 15/05/01 10:11:38 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(javi4211);
> users with modify permissions: Set(username)
> 15/05/01 10:11:38 INFO Slf4jLogger: Slf4jLogger started
> 15/05/01 10:11:38 INFO Remoting: Starting remoting
> 15/05/01 10:11:39 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://driverpropsfetc...@master.host:56534]
> 15/05/01 10:11:39 INFO Utils: Successfully started service
> 'driverPropsFetcher' on port 56534.
> 15/05/01 10:11:40 WARN Remoting: Tried to associate with unreachable remote
> address [akka.tcp://sparkdri...@driver.host:51837]. Address is now gated
> for
> 5000 ms, all messages to this address will be delivered to dead letters.
> Reason: Connection refused: no further information:
> driver.host/10.27.51.214:51837
> 15/05/01 10:12:09 ERROR UserGroupInformation: PriviledgedActionException
> as:username cause:java.util.concurrent.TimeoutException: Futures timed out
> after [30 seconds]
> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException:
> Unknown exception in doAs
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134)
>         at
>
> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
>         at
>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128)
>         at
>
> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224)
>         at
>
> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
> Caused by: java.security.PrivilegedActionException:
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>         ... 4 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [30 seconds]
>         at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
>
> Is there something else I need to do configure to ensure akka remoting will
> work correctly when running spark cluster?  Or can I ignore this error?
>
> -Javier
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Remoting-warning-when-submitting-to-cluster-tp22733.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to