Re: spark streaming actor receiver doesn't play well with kryoserializer

2014-08-07 Thread Rohit Rai
Alan/TD,

We are facing the problem in a project going to production.

Was there any progress on this? Are we able to confirm that this is a
bug/limitation in the current streaming code? Or there is anything wrong in
user scope?

Regards,
Rohit

*Founder  CEO, **Tuplejump, Inc.*

www.tuplejump.com
*The Data Engineering Platform*


On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai a...@opsclarity.com wrote:

 The stack trace was from running the Actor count sample directly, without
 a spark cluster, so I guess the logs would be from both?  I enabled more
 logging and got this stack trace

 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
  14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(alan)
  14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
 is: off
  14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
  14/07/25 17:55:27 [INFO] Remoting: Starting remoting
  14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
 addresses :[akka.tcp://spark@leungshwingchun:52156]
  14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
 akka.tcp://spark@leungshwingchun:52156]
  14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
  14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
  14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at
 root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/'
  14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
 /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-local-20140725175527-32f2
  14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
 297.0 MB.
  14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
 with id = ConnectionManagerId(leungshwingchun,52157)
  14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
 BlockManager
  14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
 leungshwingchun:52157 with 297.0 MB RAM
  14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
  14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
 http://192.168.1.233:52158
  14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
 /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
  14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
 http://192.168.1.233:52159
  14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
 http://leungshwingchun:4040
  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
 value=[Rate of successful kerberos logins and latency (milliseconds)],
 always=false, type=DEFAULT, sampleName=Ops)
  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
 value=[Rate of failed kerberos logins and latency (milliseconds)],
 always=false, type=DEFAULT, sampleName=Ops)
  14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
 related metrics
  2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
 SCDynamicStore
 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
 found, setting default realm to empty
  14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
 custom-built native-hadoop library...
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
 with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
  14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
 back to shell based
  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
 mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
  14/07/25 17:55:27 [DEBUG] Groups: Group mapping
 impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
 cacheTimeout=30
  14/07/25 17:55:28 [INFO] SparkContext: Added JAR
 file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
 at 

Re: spark streaming actor receiver doesn't play well with kryoserializer

2014-08-07 Thread Tathagata Das
Another possible reason behind this maybe that there are two versions of
Akka present in the classpath, which are interfering with each other. This
could happen through many scenarios.

1. Launching Spark application with Scala brings in Akka from Scala, which
interferes with Spark's Akka
2. Multiple Akka through some transitive dependencies

TD


On Thu, Aug 7, 2014 at 2:30 AM, Rohit Rai ro...@tuplejump.com wrote:

 Alan/TD,

 We are facing the problem in a project going to production.

 Was there any progress on this? Are we able to confirm that this is a
 bug/limitation in the current streaming code? Or there is anything wrong in
 user scope?

 Regards,
 Rohit

 *Founder  CEO, **Tuplejump, Inc.*
 
 www.tuplejump.com
 *The Data Engineering Platform*


 On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai a...@opsclarity.com wrote:

 The stack trace was from running the Actor count sample directly, without
 a spark cluster, so I guess the logs would be from both?  I enabled more
 logging and got this stack trace

 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
  14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(alan)
  14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
 is: off
  14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
  14/07/25 17:55:27 [INFO] Remoting: Starting remoting
  14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
 addresses :[akka.tcp://spark@leungshwingchun:52156]
  14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
 akka.tcp://spark@leungshwingchun:52156]
  14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
  14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
  14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories
 at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/'
  14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
 /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-local-20140725175527-32f2
  14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
 297.0 MB.
  14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
 with id = ConnectionManagerId(leungshwingchun,52157)
  14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
 BlockManager
  14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
 leungshwingchun:52157 with 297.0 MB RAM
  14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
  14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
 http://192.168.1.233:52158
  14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
 /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
  14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
 http://192.168.1.233:52159
  14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
 http://leungshwingchun:4040
  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
 value=[Rate of successful kerberos logins and latency (milliseconds)],
 always=false, type=DEFAULT, sampleName=Ops)
  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
 value=[Rate of failed kerberos logins and latency (milliseconds)],
 always=false, type=DEFAULT, sampleName=Ops)
  14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
 related metrics
  2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
 SCDynamicStore
 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
 found, setting default realm to empty
  14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
 custom-built native-hadoop library...
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
 with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
  14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
 back to shell based
  

Re: spark streaming actor receiver doesn't play well with kryoserializer

2014-07-31 Thread Prashant Sharma
This looks like a bug to me. This happens because we serialize the code
that starts the receiver and send it across. And since we have not
registered the classes of akka library it does not work. I have not tried
myself, but may be by including something like chill-akka (
https://github.com/xitrum-framework/chill-akka) might help. I am not well
aware about how kryo works internally, may be someone else can throw some
light on this.

Prashant Sharma




On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai a...@opsclarity.com wrote:

 The stack trace was from running the Actor count sample directly, without
 a spark cluster, so I guess the logs would be from both?  I enabled more
 logging and got this stack trace

 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
  14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(alan)
  14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
 is: off
  14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
  14/07/25 17:55:27 [INFO] Remoting: Starting remoting
  14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
 addresses :[akka.tcp://spark@leungshwingchun:52156]
  14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
 akka.tcp://spark@leungshwingchun:52156]
  14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
  14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
  14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at
 root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/'
  14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
 /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-local-20140725175527-32f2
  14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
 297.0 MB.
  14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
 with id = ConnectionManagerId(leungshwingchun,52157)
  14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
 BlockManager
  14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
 leungshwingchun:52157 with 297.0 MB RAM
  14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
  14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
 http://192.168.1.233:52158
  14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
 /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
  14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
 http://192.168.1.233:52159
  14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
 http://leungshwingchun:4040
  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
 value=[Rate of successful kerberos logins and latency (milliseconds)],
 always=false, type=DEFAULT, sampleName=Ops)
  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
 org.apache.hadoop.metrics2.lib.MutableRate
 org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
 with annotation
 @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
 value=[Rate of failed kerberos logins and latency (milliseconds)],
 always=false, type=DEFAULT, sampleName=Ops)
  14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
 related metrics
  2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
 SCDynamicStore
 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
 found, setting default realm to empty
  14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
 custom-built native-hadoop library...
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
 with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
  14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
 back to shell based
  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
 mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
  14/07/25 17:55:27 [DEBUG] Groups: Group mapping
 impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
 cacheTimeout=30
  14/07/25 17:55:28 [INFO] SparkContext: Added JAR
 

Re: spark streaming actor receiver doesn't play well with kryoserializer

2014-07-25 Thread Alan Ngai
The stack trace was from running the Actor count sample directly, without a 
spark cluster, so I guess the logs would be from both?  I enabled more logging 
and got this stack trace

4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(alan)
 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: 
off
 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
 14/07/25 17:55:27 [INFO] Remoting: Starting remoting
 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses 
:[akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: 
[akka.tcp://spark@leungshwingchun:52156]
 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root 
dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/'
 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at 
/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-local-20140725175527-32f2
 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 
MB.
 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id 
= ConnectionManagerId(leungshwingchun,52157)
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager
 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager 
leungshwingchun:52157 with 297.0 MB RAM
 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at 
http://192.168.1.233:52158
 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is 
/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: 
http://192.168.1.233:52159
 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at 
http://leungshwingchun:4040
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, 
about=, value=[Rate of successful kerberos logins and latency (milliseconds)], 
always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, 
about=, value=[Rate of failed kerberos logins and latency (milliseconds)], 
always=false, type=DEFAULT, sampleName=Ops)
 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group 
related metrics
 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from 
SCDynamicStore
14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, 
setting default realm to empty
 14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built 
native-hadoop library...
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with 
error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back 
to shell based
 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping 
impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
 14/07/25 17:55:27 [DEBUG] Groups: Group mapping 
impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; 
cacheTimeout=30
 14/07/25 17:55:28 [INFO] SparkContext: Added JAR 
file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
 at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar with 
timestamp 1406336128212
 14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
 14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
 14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
 14/07/25 17:55:28 [INFO] FlatMappedDStream: