Hello all!!

We've been prototyping some spark applications to read messages from Kafka
topics.  The application is quite simple, we use KafkaUtils.createStream to
receive a stream of CSV messages from a Kafka Topic.  We parse the CSV and
count the number of messages we get in each RDD. At a high-level (removing
the abstractions of our appliction), it looks like this:

val sc = new SparkConf()
      .set("spark.executor.memory", "1024m")
      .set("spark.cores.max", "3")
      .set("spark.app.name", appName)
      .set("spark.ui.port", sparkUIPort)

 val ssc =  new StreamingContext(sc, Milliseconds(emitInterval.toInt))

      .createStream(ssc, zookeeperQuorum, consumerGroup, topicMap)
      .foreachRDD( (rdd:RDD, time: Time) => {
        println("Time %s: (%s total records)".format(time, rdd.count()))

When I submit this using to spark master as local[3] everything behaves as
I'd expect.  After some startup overhead, I'm seeing the count printed to be
the same as the count I'm simulating  (1 every second for example).

When I submit this to a spark master using spark://master.host:7077, the
behavior is different.  The overhead go start receiving seems longer and
some runs I don't see anything for 30 seconds even though my simulator is
sending messages to the topic.  I also see the following error written to
stderr by every executor assigned to the job:

Using Spark's default log4j profile:
15/05/01 10:11:38 INFO SecurityManager: Changing view acls to: username
15/05/01 10:11:38 INFO SecurityManager: Changing modify acls to: username
15/05/01 10:11:38 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(javi4211);
users with modify permissions: Set(username)
15/05/01 10:11:38 INFO Slf4jLogger: Slf4jLogger started
15/05/01 10:11:38 INFO Remoting: Starting remoting
15/05/01 10:11:39 INFO Remoting: Remoting started; listening on addresses
15/05/01 10:11:39 INFO Utils: Successfully started service
'driverPropsFetcher' on port 56534.
15/05/01 10:11:40 WARN Remoting: Tried to associate with unreachable remote
address [akka.tcp://sparkdri...@driver.host:51837]. Address is now gated for
5000 ms, all messages to this address will be delivered to dead letters.
Reason: Connection refused: no further information:
15/05/01 10:12:09 ERROR UserGroupInformation: PriviledgedActionException
as:username cause:java.util.concurrent.TimeoutException: Futures timed out
after [30 seconds]
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException:
Unknown exception in doAs
Caused by: java.security.PrivilegedActionException:
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        ... 4 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

Is there something else I need to do configure to ensure akka remoting will
work correctly when running spark cluster?  Or can I ignore this error?


