[jira] [Updated] (SPARK-30522) Spark Streaming dynamic executors override or take default kafka parameters in cluster mode

2020-01-15 Thread phanikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

phanikumar updated SPARK-30522:
---
Description: 
I have written a spark streaming consumer to consume the data from Kafka. I 
found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
each partition, an executor is launched by Spark Streaming job.I have written a 
spark streaming consumer to consume the data from Kafka. I found a weird 
behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
an executor is launched by Spark Streaming job.
 The first executor id always takes the parameters I have provided while 
creating the streaming context but the executor with ID 2 and 3 always override 
the kafka parameters.
    
{code:java}
20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this 
application. Enabling Dynamic allocation for Spark Streaming applications can 
cause data loss if Write Ahead Log is not enabled for non-replayable sour    
ces like Flume. See the programming guide for details on how to enable the 
Write Ahead Log.    
20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 
write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata 
   
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x 
Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint 
interval = null   
 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated 
   20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    
20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    
20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            
auto.offset.reset = latest            
bootstrap.servers = [1,2,3]            
check.crcs = true            
client.id = client-0            
connections.max.idle.ms = 54            
default.api.timeout.ms = 6            
enable.auto.commit = false            
exclude.internal.topics = true            
fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500            
fetch.min.bytes = 1            
group.id = telemetry-streaming-service            
heartbeat.interval.ms = 3000            
interceptor.classes = []            
internal.leave.group.on.close = true            
isolation.level = read_uncommitted            
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
 
{code}
Here is the log for other executors.
    
{code:java}
 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
20/01/14 12:15:04 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
20/01/14 12:15:04 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447    
20/01/14 12:15:04 INFO BlockManager: Registering executor with local external 
shuffle service.    
20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)  
  
20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1    
20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)    
20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0   
 
20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps) 
   
20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 8.1 KB, free 6.2 GB)    
20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 
ms    20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 17.9 KB, free 6.

[jira] [Updated] (SPARK-30522) Spark Streaming dynamic executors override or take default kafka parameters in cluster mode

2020-01-15 Thread phanikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

phanikumar updated SPARK-30522:
---
Description: 
I have written a spark streaming consumer to consume the data from Kafka. I 
found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
each partition, an executor is launched by Spark Streaming job.I have written a 
spark streaming consumer to consume the data from Kafka. I found a weird 
behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
an executor is launched by Spark Streaming job.
 The first executor id always takes the parameters I have provided while 
creating the streaming context but the executor with ID 2 and 3 always override 
the kafka parameters.
    
{code:java}
20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this 
application. Enabling Dynamic allocation for Spark Streaming applications can 
cause data loss if Write Ahead Log is not enabled for non-replayable sour    
ces like Flume. See the programming guide for details on how to enable the 
Write Ahead Log.    
20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 
write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata 
   
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x 
Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint 
interval = null   
 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated 
   20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    
20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    
20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            
auto.offset.reset = latest            
bootstrap.servers = [1,2,3]            
check.crcs = true            
client.id = client-0            
connections.max.idle.ms = 54            
default.api.timeout.ms = 6            
enable.auto.commit = false            
exclude.internal.topics = true            
fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500            
fetch.min.bytes = 1            
group.id = telemetry-streaming-service            
heartbeat.interval.ms = 3000            
interceptor.classes = []            
internal.leave.group.on.close = true            
isolation.level = read_uncommitted            
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
 
{code}
Here is the log for other executors.
    
{code:java}
 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
20/01/14 12:15:04 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
20/01/14 12:15:04 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447    
20/01/14 12:15:04 INFO BlockManager: Registering executor with local external 
shuffle service.    
20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)  
  
20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1    
20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)    
20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0   
 
20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps) 
   
20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 8.1 KB, free 6.2 GB)    
20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 
ms    20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 17.9 KB, free 6.

[jira] [Updated] (SPARK-30522) Spark Streaming dynamic executors override or take default kafka parameters in cluster mode

2020-01-15 Thread phanikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

phanikumar updated SPARK-30522:
---
Description: 
I have written a spark streaming consumer to consume the data from Kafka. I 
found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
each partition, an executor is launched by Spark Streaming job.I have written a 
spark streaming consumer to consume the data from Kafka. I found a weird 
behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
an executor is launched by Spark Streaming job.
 The first executor id always takes the parameters I have provided while 
creating the streaming context but the executor with ID 2 and 3 always override 
the kafka parameters.
    
{code:java}
20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this 
application. Enabling Dynamic allocation for Spark Streaming applications can 
cause data loss if Write Ahead Log is not enabled for non-replayable sour    
ces like Flume. See the programming guide for details on how to enable the 
Write Ahead Log.    
20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 
write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata 
   
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x 
Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint 
interval = null   
 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated 
   20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    
20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    
20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            
auto.offset.reset = latest            
bootstrap.servers = [1,2,3]            
check.crcs = true            
client.id = client-0            
connections.max.idle.ms = 54            
default.api.timeout.ms = 6            
enable.auto.commit = false            
exclude.internal.topics = true            
fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500            
fetch.min.bytes = 1            
group.id = telemetry-streaming-service            
heartbeat.interval.ms = 3000            
interceptor.classes = []            
internal.leave.group.on.close = true            
isolation.level = read_uncommitted            
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
 
{code}
Here is the log for other executors.
    
{code:java}
 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
20/01/14 12:15:04 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
20/01/14 12:15:04 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447    
20/01/14 12:15:04 INFO BlockManager: Registering executor with local external 
shuffle service.    
20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)  
  
20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1    
20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)    
20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0   
 
20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps) 
   
20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 8.1 KB, free 6.2 GB)    
20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 
ms    20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 17.9 KB, free 6.

[jira] [Updated] (SPARK-30522) Spark Streaming dynamic executors override or take default kafka parameters in cluster mode

2020-01-15 Thread phanikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

phanikumar updated SPARK-30522:
---
Description: 
I have written a spark streaming consumer to consume the data from Kafka. I 
found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
each partition, an executor is launched by Spark Streaming job.I have written a 
spark streaming consumer to consume the data from Kafka. I found a weird 
behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
an executor is launched by Spark Streaming job.
 The first executor id always takes the parameters I have provided while 
creating the streaming context but the executor with ID 2 and 3 always override 
the kafka parameters.
    
{code:java}
20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this 
application. Enabling Dynamic allocation for Spark Streaming applications can 
cause data loss if Write Ahead Log is not enabled for non-replayable sour    
ces like Flume. See the programming guide for details on how to enable the 
Write Ahead Log.    
20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 
write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata 
   
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x 
Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint 
interval = null   
 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated 
   20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    
20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    
20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            
auto.offset.reset = latest            
bootstrap.servers = [1,2,3]            
check.crcs = true            
client.id = client-0            
connections.max.idle.ms = 54            
default.api.timeout.ms = 6            
enable.auto.commit = false            
exclude.internal.topics = true            
fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500            
fetch.min.bytes = 1            
group.id = telemetry-streaming-service            
heartbeat.interval.ms = 3000            
interceptor.classes = []            
internal.leave.group.on.close = true            
isolation.level = read_uncommitted            
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
 
{code}
Here is the log for other executors.
    
{code:java}
 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
20/01/14 12:15:04 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
20/01/14 12:15:04 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447    
20/01/14 12:15:04 INFO BlockManager: Registering executor with local external 
shuffle service.    
20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)  
  
20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1    
20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)    
20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0   
 
20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps) 
   
20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 8.1 KB, free 6.2 GB)    
20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 
ms    20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 17.9 KB, free 6.

[jira] [Updated] (SPARK-30522) Spark Streaming dynamic executors override or take default kafka parameters in cluster mode

2020-01-15 Thread phanikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

phanikumar updated SPARK-30522:
---
Summary: Spark Streaming dynamic executors override or take default kafka 
parameters in cluster mode  (was: Spark Streaming dynamic executors overried 
kafka parameters in cluster mode)

> Spark Streaming dynamic executors override or take default kafka parameters 
> in cluster mode
> ---
>
> Key: SPARK-30522
> URL: https://issues.apache.org/jira/browse/SPARK-30522
> Project: Spark
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 2.3.2
>Reporter: phanikumar
>Priority: Major
>
> I have written a spark streaming consumer to consume the data from Kafka. I 
> found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
> each partition, an executor is launched by Spark Streaming job.I have written 
> a spark streaming consumer to consume the data from Kafka. I found a weird 
> behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
> an executor is launched by Spark Streaming job.
> The first executor id always takes the parameters I have provided while 
> creating the streaming context but the executor with ID 2 and 3 always 
> override the kafka parameters.
>    
> {code:java}
> 20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for 
> this application. Enabling Dynamic allocation for Spark Streaming 
> applications can cause data loss if Write Ahead Log is not enabled for 
> non-replayable sour    ces like Flume. See the programming guide for details 
> on how to enable the Write Ahead Log.    20/01/14 12:15:05 INFO 
> FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 write ahead log 
> files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata    20/01/14 
> 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    20/01/14 
> 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x 
> Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint 
> interval = null    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember 
> interval = 5000 ms    20/01/14 12:15:05 INFO DirectKafkaInputDStream: 
> Initialized and validated 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
> 20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    20/01/14 
> 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated    
> 20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    20/01/14 
> 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    20/01/14 
> 12:15:05 INFO ForEachDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    20/01/14 
> 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             
> auto.commit.interval.ms = 5000            auto.offset.reset = latest          
>   bootstrap.servers = [1,2,3]            check.crcs = true            
> client.id = client-0            connections.max.idle.ms = 54            
> default.api.timeout.ms = 6            enable.auto.commit = false          
>   exclude.internal.topics = true            fetch.max.bytes = 52428800        
>     fetch.max.wait.ms = 500            fetch.min.bytes = 1            
> group.id = telemetry-streaming-service            heartbeat.interval.ms = 
> 3000            interceptor.classes = []            
> internal.leave.group.on.close = true            isolation.level = 
> read_uncommitted            key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>  
> {code}
> Here is the log for other executors.
>    
> {code:java}
>  20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
> 20/01/14 12:15:04 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
> 20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
> 20/01/14 12:15:04 INFO BlockManager: Using 
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
> policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
> BlockManagerId(2, matrix-hwork-data-05, 40324, None)    20/01/14 12:15:04 
> INFO BlockManagerMaster: Registered BlockManager BlockManagerId(2, 
> matrix-hwork-data-05, 40324, None)    20/01/14 12:15:04 INFO BlockManager: 
> external shuffle service port = 7447    20/01/14 12:15:04 INFO BlockManager: 
> Registering executor with local external shuffle service.    20/01/14 
> 12:15:04 INFO TransportClientFactory: Successfully created connection to 
> matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)   
>  20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
> BlockManagerId(