[ 
https://issues.apache.org/jira/browse/SPARK-30522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

phanikumar updated SPARK-30522:
-------------------------------
    Description: 
I have written a spark streaming consumer to consume the data from Kafka. I 
found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
each partition, an executor is launched by Spark Streaming job.I have written a 
spark streaming consumer to consume the data from Kafka. I found a weird 
behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
an executor is launched by Spark Streaming job.
 The first executor id always takes the parameters I have provided while 
creating the streaming context but the executor with ID 2 and 3 always override 
the kafka parameters.
    
{code:java}
20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this 
application. Enabling Dynamic allocation for Spark Streaming applications can 
cause data loss if Write Ahead Log is not enabled for non-replayable sour    
ces like Flume. See the programming guide for details on how to enable the 
Write Ahead Log.    
20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 
write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata 
   
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x 
Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint 
interval = null   
 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated 
   20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    
20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    
20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            
auto.offset.reset = latest            
bootstrap.servers = [1,2,3]            
check.crcs = true            
client.id = client-0            
connections.max.idle.ms = 540000            
default.api.timeout.ms = 60000            
enable.auto.commit = false            
exclude.internal.topics = true            
fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500            
fetch.min.bytes = 1            
group.id = telemetry-streaming-service            
heartbeat.interval.ms = 3000            
interceptor.classes = []            
internal.leave.group.on.close = true            
isolation.level = read_uncommitted            
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
 
{code}
Here is the log for other executors.
    
{code:java}
 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
20/01/14 12:15:04 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
20/01/14 12:15:04 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447    
20/01/14 12:15:04 INFO BlockManager: Registering executor with local external 
shuffle service.    
20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)  
  
20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1    
20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)    
20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0   
 
20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps) 
   
20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 8.1 KB, free 6.2 GB)    
20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 
ms    20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 17.9 KB, free 6.2 GB)    
20/01/14 12:15:20 INFO KafkaRDD: Computing topic telemetry, partition 1 offsets 
237352170 -> 237352311    20/01/14 12:15:20 INFO CachedKafkaConsumer: 
Initializing cache 16 64 0.75    20/01/14 12:15:20 INFO CachedKafkaConsumer: 
Cache miss for CacheKey(spark-executor-telemetry-streaming-service,telemetry,1) 
   
20/01/14 12:15:20 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            
auto.offset.reset = none            
bootstrap.servers = [1,2,3]            
check.crcs = true            
client.id = client-0            
connections.max.idle.ms = 540000            
default.api.timeout.ms = 60000            
enable.auto.commit = false            
exclude.internal.topics = true            
fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500

{code}
 

If we closely observer in the first executor the **auto.offset.reset is 
latest** but for the other executors the **auto.offset.reset = none**

 

Here is how I am creating the streaming context
  

 
public void init() throws Exception {

finalStringBOOTSTRAP_SERVERS = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.broker.list");
finalStringDYNAMIC_ALLOCATION_ENABLED = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.enabled");
finalStringDYNAMIC_ALLOCATION_SCALING_INTERVAL = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.scalingInterval");
finalStringDYNAMIC_ALLOCATION_MIN_EXECUTORS = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.minExecutors");
finalStringDYNAMIC_ALLOCATION_MAX_EXECUTORS = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.maxExecutors");
finalStringDYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.executorIdleTimeout");
finalStringDYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout");
finalStringSPARK_SHUFFLE_SERVICE_ENABLED = 
PropertyFileReader.getInstance().getProperty("spark.shuffle.service.enabled");
finalStringSPARK_LOCALITY_WAIT = 
PropertyFileReader.getInstance().getProperty("spark.locality.wait");
finalStringSPARK_KAFKA_CONSUMER_POLL_INTERVAL = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.consumer.poll.ms");
finalStringSPARK_KAFKA_MAX_RATE_PER_PARTITION = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.maxRatePerPartition");
finalStringSPARK_BATCH_DURATION_IN_SECONDS = 
PropertyFileReader.getInstance().getProperty("spark.batch.duration.in.seconds");
finalStringKAFKA_TOPIC = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.topic");

LOGGER.debug("connecting to brokers ::" + BOOTSTRAP_SERVERS);
LOGGER.debug("bootstrapping properties to create consumer");

kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "telemetry-streaming-service");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("client.id","client-0");
// Below property should be enabled in properties and changed based on
// performance testing
kafkaParams.put("max.poll.records",PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.max.poll.records"));

LOGGER.info("registering as a consumer with the topic :: " + KAFKA_TOPIC);
topics = Arrays.asList(KAFKA_TOPIC);
sparkConf = new SparkConf().setMaster(PropertyFileReader.getInstance()
.getProperty("spark.master.url".setAppName(PropertyFileReader.getInstance().getProperty("spark.application.name"))
.set("spark.streaming.dynamicAllocation.enabled", DYNAMIC_ALLOCATION_ENABLE)
.set("spark.streaming.dynamicAllocation.scalingInterval", 
DYNAMIC_ALLOCATION_SCALING_INTERVAL)
.set("spark.streaming.dynamicAllocation.minExecutors", 
DYNAMIC_ALLOCATION_MIN_EXECUTOR)
.set("spark.streaming.dynamicAllocation.maxExecutors", 
DYNAMIC_ALLOCATION_MAX_EXECUTOR)
.set("spark.streaming.dynamicAllocation.executorIdleTimeout", 
DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOU)
.set("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout",DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)
.set("spark.shuffle.service.enabled", SPARK_SHUFFLE_SERVICE_ENABLE)
.set("spark.locality.wait", SPARK_LOCALITY_WAIT)
.set("spark.streaming.kafka.consumer.poll.ms", 
SPARK_KAFKA_CONSUMER_POLL_INTERVAL)
.set("spark.streaming.kafka.maxRatePerPartition", 
SPARK_KAFKA_MAX_RATE_PER_PARTITION);

LOGGER.debug("creating streaming context with minutes batch interval ::: " + 
SPARK_BATCH_DURATION_IN_SECONDS);
streamingContext = new 
JavaStreamingContext(sparkConf,Durations.seconds(Integer.parseInt(SPARK_BATCH_DURATION_IN_SECONDS)));

LOGGER.info("checkpointing the streaming transactions at hdfs path :: 
/checkpoint");
streamingContext.checkpoint("/checkpoint");
streamingContext.addStreamingListener(newDataProcessingListener());
}

@Override
publicvoidexecute() throws InterruptedException {
JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = 
KafkaUtils.createDirectStream(streamingContext, 
LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, 
kafkaParams));
telemetryStream.foreachRDD(rawRDD -> {
if (!rawRDD.isEmpty()) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rawRDD.rdd()).offsetRanges();
SparkSessionspark = 
JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf());
JavaPairRDD<String, String> flattenedRawRDD = rawRDD.mapToPair(record -> {
ObjectMapperom = newObjectMapper();
JsonNoderoot = om.readTree(record.value());
Map<String, JsonNode> flattenedMap = newFlatJsonGenerator(root).flatten();
JsonNodeflattenedRootNode = om.convertValue(flattenedMap, JsonNode.class);
 
returnnewTuple2<String, 
String>(flattenedRootNode.get("/name").asText(),flattenedRootNode.toString());
})

Dataset<Row> rawFlattenedDataRDD = spark
.createDataset(flattenedRawRDD.rdd(), Encoders.tuple(Encoders.STRING(), 
Encoders.STRING()))
.toDF("sensor_path", "sensor_data");
Dataset<Row> groupedDS = rawFlattenedDataRDD.groupBy(col("sensor_path"))
.agg(collect_list(col("sensor_data").as("sensor_data")));
Dataset<Row> lldpGroupedDS = groupedDS.filter((FilterFunction<Row>) r 
->r.getString(0).equals("Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device"));
lldpGroupedDS.show(2);

HashMap<Object, Object> params = newHashMap<>();
params.put(DPConstants.OTSDB_CONFIG_F_PATH, 
ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.config.file.path"));
params.put(DPConstants.OTSDB_CLIENT_TYPE, 
ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.client.type"));

try {
PipelinelldpPipeline = PipelineFactory.getPipeline(PipelineType.LLDPTELEMETRY);
lldpPipeline.process(lldpGroupedDS, null);

Pipelinepipeline = PipelineFactory.getPipeline(PipelineType.TELEMETRY);
pipeline.process(groupedDS, params);

}
});

streamingContext.start();
streamingContext.awaitTermination();
{code:java}
 {code}

  was:
I have written a spark streaming consumer to consume the data from Kafka. I 
found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
each partition, an executor is launched by Spark Streaming job.I have written a 
spark streaming consumer to consume the data from Kafka. I found a weird 
behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
an executor is launched by Spark Streaming job.
 The first executor id always takes the parameters I have provided while 
creating the streaming context but the executor with ID 2 and 3 always override 
the kafka parameters.
    
{code:java}
20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this 
application. Enabling Dynamic allocation for Spark Streaming applications can 
cause data loss if Write Ahead Log is not enabled for non-replayable sour    
ces like Flume. See the programming guide for details on how to enable the 
Write Ahead Log.    
20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 
write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata 
   
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x 
Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint 
interval = null   
 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated 
   20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null    
20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    
20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated 
org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    
20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            
auto.offset.reset = latest            
bootstrap.servers = [1,2,3]            
check.crcs = true            
client.id = client-0            
connections.max.idle.ms = 540000            
default.api.timeout.ms = 60000            
enable.auto.commit = false            
exclude.internal.topics = true            
fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500            
fetch.min.bytes = 1            
group.id = telemetry-streaming-service            
heartbeat.interval.ms = 3000            
interceptor.classes = []            
internal.leave.group.on.close = true            
isolation.level = read_uncommitted            
key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
 
{code}
Here is the log for other executors.
    
{code:java}
 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
20/01/14 12:15:04 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
20/01/14 12:15:04 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447    
20/01/14 12:15:04 INFO BlockManager: Registering executor with local external 
shuffle service.    
20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps)  
  
20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1    
20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)    
20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0   
 
20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection 
to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps) 
   
20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 8.1 KB, free 6.2 GB)    
20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 
ms    20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 17.9 KB, free 6.2 GB)    
20/01/14 12:15:20 INFO KafkaRDD: Computing topic telemetry, partition 1 offsets 
237352170 -> 237352311    20/01/14 12:15:20 INFO CachedKafkaConsumer: 
Initializing cache 16 64 0.75    20/01/14 12:15:20 INFO CachedKafkaConsumer: 
Cache miss for CacheKey(spark-executor-telemetry-streaming-service,telemetry,1) 
   
20/01/14 12:15:20 INFO ConsumerConfig: ConsumerConfig values:             
auto.commit.interval.ms = 5000            
auto.offset.reset = none            
bootstrap.servers = [1,2,3]            
check.crcs = true            
client.id = client-0            
connections.max.idle.ms = 540000            
default.api.timeout.ms = 60000            
enable.auto.commit = false            
exclude.internal.topics = true            
fetch.max.bytes = 52428800            
fetch.max.wait.ms = 500

{code}
 

If we closely observer in the first executor the **auto.offset.reset is 
latest** but for the other executors the **auto.offset.reset = none**

 

Here is how I am creating the streaming context
  
{code:java}
public void init() throws Exception {
final String BOOTSTRAP_SERVERS = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.broker.list");final
 String DYNAMIC_ALLOCATION_ENABLED = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.enabled");final
 String DYNAMIC_ALLOCATION_SCALING_INTERVAL = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.scalingInterval");final
 String DYNAMIC_ALLOCATION_MIN_EXECUTORS = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.minExecutors");final
 String DYNAMIC_ALLOCATION_MAX_EXECUTORS = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.maxExecutors");final
 String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.executorIdleTimeout");final
 String DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = 
PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout");final
 String SPARK_SHUFFLE_SERVICE_ENABLED = 
PropertyFileReader.getInstance().getProperty("spark.shuffle.service.enabled");final
 String SPARK_LOCALITY_WAIT = 
PropertyFileReader.getInstance().getProperty("spark.locality.wait");final 
String SPARK_KAFKA_CONSUMER_POLL_INTERVAL = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.consumer.poll.ms");final
 String SPARK_KAFKA_MAX_RATE_PER_PARTITION = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.maxRatePerPartition");final
 String SPARK_BATCH_DURATION_IN_SECONDS = 
PropertyFileReader.getInstance().getProperty("spark.batch.duration.in.seconds");final
 String KAFKA_TOPIC = 
PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.topic");
kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", 
BOOTSTRAP_SERVERS);kafkaParams.put("key.deserializer", 
StringDeserializer.class);kafkaParams.put("value.deserializer", 
StringDeserializer.class);kafkaParams.put("group.id", 
"telemetry-streaming-service");kafkaParams.put("auto.offset.reset", 
"latest");kafkaParams.put("enable.auto.commit", 
false);kafkaParams.put("client.id","client-0");kafkaParams.put("max.poll.records",PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.max.poll.records"));
topics = Arrays.asList(KAFKA_TOPIC);sparkConf = new 
SparkConf()//.setMaster(PropertyFileReader.getInstance().getProperty("spark.master.url")).setAppName(PropertyFileReader.getInstance().getProperty("spark.application.name")).set("spark.streaming.dynamicAllocation.enabled",
 
DYNAMIC_ALLOCATION_ENABLED).set("spark.streaming.dynamicAllocation.scalingInterval",
 
DYNAMIC_ALLOCATION_SCALING_INTERVAL).set("spark.streaming.dynamicAllocation.minExecutors",
 
DYNAMIC_ALLOCATION_MIN_EXECUTORS).set("spark.streaming.dynamicAllocation.maxExecutors",
 
DYNAMIC_ALLOCATION_MAX_EXECUTORS).set("spark.streaming.dynamicAllocation.executorIdleTimeout",
 
DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT).set("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout",DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT).set("spark.shuffle.service.enabled",
 SPARK_SHUFFLE_SERVICE_ENABLED).set("spark.locality.wait", 
SPARK_LOCALITY_WAIT).set("spark.streaming.kafka.consumer.poll.ms", 
SPARK_KAFKA_CONSUMER_POLL_INTERVAL).set("spark.streaming.kafka.maxRatePerPartition",
 SPARK_KAFKA_MAX_RATE_PER_PARTITION);
streamingContext = new 
JavaStreamingContext(sparkConf,Durations.seconds(Integer.parseInt(SPARK_BATCH_DURATION_IN_SECONDS)));streamingContext.checkpoint("/checkpoint");streamingContext.addStreamingListener(new
 DataProcessingListener());}
@Overridepublic void execute() throws InterruptedException 
{JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = 
KafkaUtils.createDirectStream(streamingContext, 
LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, 
kafkaParams));
telemetryStream.foreachRDD(rawRDD -> {if (!rawRDD.isEmpty()) {OffsetRange[] 
offsetRanges = ((HasOffsetRanges) 
rawRDD.rdd()).offsetRanges();LOGGER.debug("list of OffsetRanges getting 
processed as a string :: "+ 
Arrays.asList(offsetRanges).toString());System.out.println("offsetRanges : " + 
offsetRanges.length);SparkSession spark = 
JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf());JavaPairRDD<String,
 String> flattenedRawRDD = rawRDD.mapToPair(record -> {ObjectMapper om = new 
ObjectMapper();JsonNode root = om.readTree(record.value());Map<String, 
JsonNode> flattenedMap = new FlatJsonGenerator(root).flatten();JsonNode 
flattenedRootNode = om.convertValue(flattenedMap, JsonNode.class);return new 
Tuple2<String, 
String>(flattenedRootNode.get("/name").asText(),flattenedRootNode.toString());});
Dataset<Row> rawFlattenedDataRDD = spark.createDataset(flattenedRawRDD.rdd(), 
Encoders.tuple(Encoders.STRING(), Encoders.STRING())).toDF("sensor_path", 
"sensor_data");Dataset<Row> groupedDS = 
rawFlattenedDataRDD.groupBy(col("sensor_path")).agg(collect_list(col("sensor_data").as("sensor_data")));
Dataset<Row> lldpGroupedDS = groupedDS.filter((FilterFunction<Row>) r -> 
r.getString(0).equals("Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device"));
lldpGroupedDS.show(2);
HashMap<Object, Object> params = new 
HashMap<>();params.put(DPConstants.OTSDB_CONFIG_F_PATH, 
ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.config.file.path"));params.put(DPConstants.OTSDB_CLIENT_TYPE,
 ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.client.type"));
try {Pipeline lldpPipeline = 
PipelineFactory.getPipeline(PipelineType.LLDPTELEMETRY);lldpPipeline.process(lldpGroupedDS,
 null);
Pipeline pipeline = 
PipelineFactory.getPipeline(PipelineType.TELEMETRY);pipeline.process(groupedDS, 
params);
}catch (Throwable t){t.printStackTrace();}((CanCommitOffsets) 
telemetryStream.inputDStream()).commitAsync(offsetRanges);
}});
streamingContext.start();streamingContext.awaitTermination();{code}


> Spark Streaming dynamic executors override or take default kafka parameters 
> in cluster mode
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-30522
>                 URL: https://issues.apache.org/jira/browse/SPARK-30522
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 2.3.2
>            Reporter: phanikumar
>            Priority: Major
>
> I have written a spark streaming consumer to consume the data from Kafka. I 
> found a weird behavior in my logs. The Kafka topic has 3 partitions and for 
> each partition, an executor is launched by Spark Streaming job.I have written 
> a spark streaming consumer to consume the data from Kafka. I found a weird 
> behavior in my logs. The Kafka topic has 3 partitions and for each partition, 
> an executor is launched by Spark Streaming job.
>  The first executor id always takes the parameters I have provided while 
> creating the streaming context but the executor with ID 2 and 3 always 
> override the kafka parameters.
>     
> {code:java}
> 20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for 
> this application. Enabling Dynamic allocation for Spark Streaming 
> applications can cause data loss if Write Ahead Log is not enabled for 
> non-replayable sour    ces like Flume. See the programming guide for details 
> on how to enable the Write Ahead Log.    
> 20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 
> 2 write ahead log files from 
> hdfs://tlabnamenode/checkpoint/receivedBlockMetadata    
> 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms    
> 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x 
> Replicated    20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint 
> interval = null   
>  20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms  
>   
> 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f    
> 20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms    
> 20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x 
> Replicated    20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = 
> null    
> 20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms    
> 20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac    
> 20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values:             
> auto.commit.interval.ms = 5000            
> auto.offset.reset = latest            
> bootstrap.servers = [1,2,3]            
> check.crcs = true            
> client.id = client-0            
> connections.max.idle.ms = 540000            
> default.api.timeout.ms = 60000            
> enable.auto.commit = false            
> exclude.internal.topics = true            
> fetch.max.bytes = 52428800            
> fetch.max.wait.ms = 500            
> fetch.min.bytes = 1            
> group.id = telemetry-streaming-service            
> heartbeat.interval.ms = 3000            
> interceptor.classes = []            
> internal.leave.group.on.close = true            
> isolation.level = read_uncommitted            
> key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>  
> {code}
> Here is the log for other executors.
>     
> {code:java}
>  20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1    
> 20/01/14 12:15:04 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324.    
> 20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1    
> 20/01/14 12:15:04 INFO BlockManager: Using 
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
> policy    20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager 
> BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
> 20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager 
> BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
> 20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447    
> 20/01/14 12:15:04 INFO BlockManager: Registering executor with local external 
> shuffle service.    
> 20/01/14 12:15:04 INFO TransportClientFactory: Successfully created 
> connection to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in 
> bootstraps)    
> 20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: 
> BlockManagerId(2, matrix-hwork-data-05, 40324, None)    
> 20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1    
> 20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)    
> 20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0 
>    
> 20/01/14 12:15:19 INFO TransportClientFactory: Successfully created 
> connection to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent 
> in bootstraps)    
> 20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 8.1 KB, free 6.2 GB)    
> 20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 
> 163 ms    20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as 
> values in memory (estimated size 17.9 KB, free 6.2 GB)    
> 20/01/14 12:15:20 INFO KafkaRDD: Computing topic telemetry, partition 1 
> offsets 237352170 -> 237352311    20/01/14 12:15:20 INFO CachedKafkaConsumer: 
> Initializing cache 16 64 0.75    20/01/14 12:15:20 INFO CachedKafkaConsumer: 
> Cache miss for 
> CacheKey(spark-executor-telemetry-streaming-service,telemetry,1)    
> 20/01/14 12:15:20 INFO ConsumerConfig: ConsumerConfig values:             
> auto.commit.interval.ms = 5000            
> auto.offset.reset = none            
> bootstrap.servers = [1,2,3]            
> check.crcs = true            
> client.id = client-0            
> connections.max.idle.ms = 540000            
> default.api.timeout.ms = 60000            
> enable.auto.commit = false            
> exclude.internal.topics = true            
> fetch.max.bytes = 52428800            
> fetch.max.wait.ms = 500
> {code}
>  
> If we closely observer in the first executor the **auto.offset.reset is 
> latest** but for the other executors the **auto.offset.reset = none**
>  
> Here is how I am creating the streaming context
>   
>  
> public void init() throws Exception {
> finalStringBOOTSTRAP_SERVERS = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.broker.list");
> finalStringDYNAMIC_ALLOCATION_ENABLED = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.enabled");
> finalStringDYNAMIC_ALLOCATION_SCALING_INTERVAL = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.scalingInterval");
> finalStringDYNAMIC_ALLOCATION_MIN_EXECUTORS = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.minExecutors");
> finalStringDYNAMIC_ALLOCATION_MAX_EXECUTORS = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.maxExecutors");
> finalStringDYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.executorIdleTimeout");
> finalStringDYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout");
> finalStringSPARK_SHUFFLE_SERVICE_ENABLED = 
> PropertyFileReader.getInstance().getProperty("spark.shuffle.service.enabled");
> finalStringSPARK_LOCALITY_WAIT = 
> PropertyFileReader.getInstance().getProperty("spark.locality.wait");
> finalStringSPARK_KAFKA_CONSUMER_POLL_INTERVAL = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.consumer.poll.ms");
> finalStringSPARK_KAFKA_MAX_RATE_PER_PARTITION = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.maxRatePerPartition");
> finalStringSPARK_BATCH_DURATION_IN_SECONDS = 
> PropertyFileReader.getInstance().getProperty("spark.batch.duration.in.seconds");
> finalStringKAFKA_TOPIC = 
> PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.topic");
> LOGGER.debug("connecting to brokers ::" + BOOTSTRAP_SERVERS);
> LOGGER.debug("bootstrapping properties to create consumer");
> kafkaParams = new HashMap<>();
> kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
> kafkaParams.put("key.deserializer", StringDeserializer.class);
> kafkaParams.put("value.deserializer", StringDeserializer.class);
> kafkaParams.put("group.id", "telemetry-streaming-service");
> kafkaParams.put("auto.offset.reset", "latest");
> kafkaParams.put("enable.auto.commit", false);
> kafkaParams.put("client.id","client-0");
> // Below property should be enabled in properties and changed based on
> // performance testing
> kafkaParams.put("max.poll.records",PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.max.poll.records"));
> LOGGER.info("registering as a consumer with the topic :: " + KAFKA_TOPIC);
> topics = Arrays.asList(KAFKA_TOPIC);
> sparkConf = new SparkConf().setMaster(PropertyFileReader.getInstance()
> .getProperty("spark.master.url".setAppName(PropertyFileReader.getInstance().getProperty("spark.application.name"))
> .set("spark.streaming.dynamicAllocation.enabled", DYNAMIC_ALLOCATION_ENABLE)
> .set("spark.streaming.dynamicAllocation.scalingInterval", 
> DYNAMIC_ALLOCATION_SCALING_INTERVAL)
> .set("spark.streaming.dynamicAllocation.minExecutors", 
> DYNAMIC_ALLOCATION_MIN_EXECUTOR)
> .set("spark.streaming.dynamicAllocation.maxExecutors", 
> DYNAMIC_ALLOCATION_MAX_EXECUTOR)
> .set("spark.streaming.dynamicAllocation.executorIdleTimeout", 
> DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOU)
> .set("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout",DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)
> .set("spark.shuffle.service.enabled", SPARK_SHUFFLE_SERVICE_ENABLE)
> .set("spark.locality.wait", SPARK_LOCALITY_WAIT)
> .set("spark.streaming.kafka.consumer.poll.ms", 
> SPARK_KAFKA_CONSUMER_POLL_INTERVAL)
> .set("spark.streaming.kafka.maxRatePerPartition", 
> SPARK_KAFKA_MAX_RATE_PER_PARTITION);
> LOGGER.debug("creating streaming context with minutes batch interval ::: " + 
> SPARK_BATCH_DURATION_IN_SECONDS);
> streamingContext = new 
> JavaStreamingContext(sparkConf,Durations.seconds(Integer.parseInt(SPARK_BATCH_DURATION_IN_SECONDS)));
> LOGGER.info("checkpointing the streaming transactions at hdfs path :: 
> /checkpoint");
> streamingContext.checkpoint("/checkpoint");
> streamingContext.addStreamingListener(newDataProcessingListener());
> }
> @Override
> publicvoidexecute() throws InterruptedException {
> JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = 
> KafkaUtils.createDirectStream(streamingContext, 
> LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, 
> kafkaParams));
> telemetryStream.foreachRDD(rawRDD -> {
> if (!rawRDD.isEmpty()) {
> OffsetRange[] offsetRanges = ((HasOffsetRanges) rawRDD.rdd()).offsetRanges();
> SparkSessionspark = 
> JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf());
> JavaPairRDD<String, String> flattenedRawRDD = rawRDD.mapToPair(record -> {
> ObjectMapperom = newObjectMapper();
> JsonNoderoot = om.readTree(record.value());
> Map<String, JsonNode> flattenedMap = newFlatJsonGenerator(root).flatten();
> JsonNodeflattenedRootNode = om.convertValue(flattenedMap, JsonNode.class);
>  
> returnnewTuple2<String, 
> String>(flattenedRootNode.get("/name").asText(),flattenedRootNode.toString());
> })
> Dataset<Row> rawFlattenedDataRDD = spark
> .createDataset(flattenedRawRDD.rdd(), Encoders.tuple(Encoders.STRING(), 
> Encoders.STRING()))
> .toDF("sensor_path", "sensor_data");
> Dataset<Row> groupedDS = rawFlattenedDataRDD.groupBy(col("sensor_path"))
> .agg(collect_list(col("sensor_data").as("sensor_data")));
> Dataset<Row> lldpGroupedDS = groupedDS.filter((FilterFunction<Row>) r 
> ->r.getString(0).equals("Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device"));
> lldpGroupedDS.show(2);
> HashMap<Object, Object> params = newHashMap<>();
> params.put(DPConstants.OTSDB_CONFIG_F_PATH, 
> ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.config.file.path"));
> params.put(DPConstants.OTSDB_CLIENT_TYPE, 
> ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.client.type"));
> try {
> PipelinelldpPipeline = 
> PipelineFactory.getPipeline(PipelineType.LLDPTELEMETRY);
> lldpPipeline.process(lldpGroupedDS, null);
> Pipelinepipeline = PipelineFactory.getPipeline(PipelineType.TELEMETRY);
> pipeline.process(groupedDS, params);
> }
> });
> streamingContext.start();
> streamingContext.awaitTermination();
> {code:java}
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to