[ https://issues.apache.org/jira/browse/SPARK-30522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
phanikumar updated SPARK-30522: ------------------------------- Description: I have written a spark streaming consumer to consume the data from Kafka. I found a weird behavior in my logs. The Kafka topic has 3 partitions and for each partition, an executor is launched by Spark Streaming job.I have written a spark streaming consumer to consume the data from Kafka. I found a weird behavior in my logs. The Kafka topic has 3 partitions and for each partition, an executor is launched by Spark Streaming job. The first executor id always takes the parameters I have provided while creating the streaming context but the executor with ID 2 and 3 always override the kafka parameters. {code:java} 20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sour ces like Flume. See the programming guide for details on how to enable the Write Ahead Log. 20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x Replicated 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint interval = null 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f 20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms 20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated 20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null 20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms 20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac 20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [1,2,3] check.crcs = true client.id = client-0 connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = telemetry-streaming-service heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer {code} Here is the log for other executors. {code:java} 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1 20/01/14 12:15:04 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324. 20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1 20/01/14 12:15:04 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(2, matrix-hwork-data-05, 40324, None) 20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(2, matrix-hwork-data-05, 40324, None) 20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447 20/01/14 12:15:04 INFO BlockManager: Registering executor with local external shuffle service. 20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps) 20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: BlockManagerId(2, matrix-hwork-data-05, 40324, None) 20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1 20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0 20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps) 20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 8.1 KB, free 6.2 GB) 20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 ms 20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 17.9 KB, free 6.2 GB) 20/01/14 12:15:20 INFO KafkaRDD: Computing topic telemetry, partition 1 offsets 237352170 -> 237352311 20/01/14 12:15:20 INFO CachedKafkaConsumer: Initializing cache 16 64 0.75 20/01/14 12:15:20 INFO CachedKafkaConsumer: Cache miss for CacheKey(spark-executor-telemetry-streaming-service,telemetry,1) 20/01/14 12:15:20 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [1,2,3] check.crcs = true client.id = client-0 connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 {code} If we closely observer in the first executor the **auto.offset.reset is latest** but for the other executors the **auto.offset.reset = none** Here is how I am creating the streaming context {code:java} public void init() throws Exception { final String BOOTSTRAP_SERVERS = PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.broker.list");final String DYNAMIC_ALLOCATION_ENABLED = PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.enabled");final String DYNAMIC_ALLOCATION_SCALING_INTERVAL = PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.scalingInterval");final String DYNAMIC_ALLOCATION_MIN_EXECUTORS = PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.minExecutors");final String DYNAMIC_ALLOCATION_MAX_EXECUTORS = PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.maxExecutors");final String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.executorIdleTimeout");final String DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout");final String SPARK_SHUFFLE_SERVICE_ENABLED = PropertyFileReader.getInstance().getProperty("spark.shuffle.service.enabled");final String SPARK_LOCALITY_WAIT = PropertyFileReader.getInstance().getProperty("spark.locality.wait");final String SPARK_KAFKA_CONSUMER_POLL_INTERVAL = PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.consumer.poll.ms");final String SPARK_KAFKA_MAX_RATE_PER_PARTITION = PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.maxRatePerPartition");final String SPARK_BATCH_DURATION_IN_SECONDS = PropertyFileReader.getInstance().getProperty("spark.batch.duration.in.seconds");final String KAFKA_TOPIC = PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.topic"); kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", "telemetry-streaming-service");kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", false);kafkaParams.put("client.id","client-0");kafkaParams.put("max.poll.records",PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.max.poll.records")); topics = Arrays.asList(KAFKA_TOPIC);sparkConf = new SparkConf()//.setMaster(PropertyFileReader.getInstance().getProperty("spark.master.url")).setAppName(PropertyFileReader.getInstance().getProperty("spark.application.name")).set("spark.streaming.dynamicAllocation.enabled", DYNAMIC_ALLOCATION_ENABLED).set("spark.streaming.dynamicAllocation.scalingInterval", DYNAMIC_ALLOCATION_SCALING_INTERVAL).set("spark.streaming.dynamicAllocation.minExecutors", DYNAMIC_ALLOCATION_MIN_EXECUTORS).set("spark.streaming.dynamicAllocation.maxExecutors", DYNAMIC_ALLOCATION_MAX_EXECUTORS).set("spark.streaming.dynamicAllocation.executorIdleTimeout", DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT).set("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout",DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT).set("spark.shuffle.service.enabled", SPARK_SHUFFLE_SERVICE_ENABLED).set("spark.locality.wait", SPARK_LOCALITY_WAIT).set("spark.streaming.kafka.consumer.poll.ms", SPARK_KAFKA_CONSUMER_POLL_INTERVAL).set("spark.streaming.kafka.maxRatePerPartition", SPARK_KAFKA_MAX_RATE_PER_PARTITION); streamingContext = new JavaStreamingContext(sparkConf,Durations.seconds(Integer.parseInt(SPARK_BATCH_DURATION_IN_SECONDS)));streamingContext.checkpoint("/checkpoint");streamingContext.addStreamingListener(new DataProcessingListener());} @Overridepublic void execute() throws InterruptedException {JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)); telemetryStream.foreachRDD(rawRDD -> {if (!rawRDD.isEmpty()) {OffsetRange[] offsetRanges = ((HasOffsetRanges) rawRDD.rdd()).offsetRanges();LOGGER.debug("list of OffsetRanges getting processed as a string :: "+ Arrays.asList(offsetRanges).toString());System.out.println("offsetRanges : " + offsetRanges.length);SparkSession spark = JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf());JavaPairRDD<String, String> flattenedRawRDD = rawRDD.mapToPair(record -> {ObjectMapper om = new ObjectMapper();JsonNode root = om.readTree(record.value());Map<String, JsonNode> flattenedMap = new FlatJsonGenerator(root).flatten();JsonNode flattenedRootNode = om.convertValue(flattenedMap, JsonNode.class);return new Tuple2<String, String>(flattenedRootNode.get("/name").asText(),flattenedRootNode.toString());}); Dataset<Row> rawFlattenedDataRDD = spark.createDataset(flattenedRawRDD.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING())).toDF("sensor_path", "sensor_data");Dataset<Row> groupedDS = rawFlattenedDataRDD.groupBy(col("sensor_path")).agg(collect_list(col("sensor_data").as("sensor_data"))); Dataset<Row> lldpGroupedDS = groupedDS.filter((FilterFunction<Row>) r -> r.getString(0).equals("Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device")); lldpGroupedDS.show(2); HashMap<Object, Object> params = new HashMap<>();params.put(DPConstants.OTSDB_CONFIG_F_PATH, ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.config.file.path"));params.put(DPConstants.OTSDB_CLIENT_TYPE, ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.client.type")); try {Pipeline lldpPipeline = PipelineFactory.getPipeline(PipelineType.LLDPTELEMETRY);lldpPipeline.process(lldpGroupedDS, null); Pipeline pipeline = PipelineFactory.getPipeline(PipelineType.TELEMETRY);pipeline.process(groupedDS, params); }catch (Throwable t){t.printStackTrace();}((CanCommitOffsets) telemetryStream.inputDStream()).commitAsync(offsetRanges); }}); streamingContext.start();streamingContext.awaitTermination();{code} was: I have written a spark streaming consumer to consume the data from Kafka. I found a weird behavior in my logs. The Kafka topic has 3 partitions and for each partition, an executor is launched by Spark Streaming job.I have written a spark streaming consumer to consume the data from Kafka. I found a weird behavior in my logs. The Kafka topic has 3 partitions and for each partition, an executor is launched by Spark Streaming job. The first executor id always takes the parameters I have provided while creating the streaming context but the executor with ID 2 and 3 always override the kafka parameters. {code:java} 20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sour ces like Flume. See the programming guide for details on how to enable the Write Ahead Log. 20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 2 write ahead log files from hdfs://tlabnamenode/checkpoint/receivedBlockMetadata 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x Replicated 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint interval = null 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f 20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms 20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x Replicated 20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = null 20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms 20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac 20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [1,2,3] check.crcs = true client.id = client-0 connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = telemetry-streaming-service heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer {code} Here is the log for other executors. {code:java} 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1 20/01/14 12:15:04 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324. 20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1 20/01/14 12:15:04 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(2, matrix-hwork-data-05, 40324, None) 20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(2, matrix-hwork-data-05, 40324, None) 20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447 20/01/14 12:15:04 INFO BlockManager: Registering executor with local external shuffle service. 20/01/14 12:15:04 INFO TransportClientFactory: Successfully created connection to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in bootstraps) 20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: BlockManagerId(2, matrix-hwork-data-05, 40324, None) 20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1 20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0 20/01/14 12:15:19 INFO TransportClientFactory: Successfully created connection to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent in bootstraps) 20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 8.1 KB, free 6.2 GB) 20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took 163 ms 20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 17.9 KB, free 6.2 GB) 20/01/14 12:15:20 INFO KafkaRDD: Computing topic telemetry, partition 1 offsets 237352170 -> 237352311 20/01/14 12:15:20 INFO CachedKafkaConsumer: Initializing cache 16 64 0.75 20/01/14 12:15:20 INFO CachedKafkaConsumer: Cache miss for CacheKey(spark-executor-telemetry-streaming-service,telemetry,1) 20/01/14 12:15:20 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = none bootstrap.servers = [1,2,3] check.crcs = true client.id = client-0 connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 If we closely observer in the first executor the **auto.offset.reset is latest** but for the other executors the **auto.offset.reset = none**{code} Here is how I am creating the streaming context {code:java} public void init() throws Exception {public void init() throws Exception { final String BOOTSTRAP_SERVERS = PropertyFileReader.getInstance() .getProperty("spark.streaming.kafka.broker.list"); final String DYNAMIC_ALLOCATION_ENABLED = PropertyFileReader.getInstance() .getProperty("spark.streaming.dynamicAllocation.enabled"); final String DYNAMIC_ALLOCATION_SCALING_INTERVAL = PropertyFileReader.getInstance() .getProperty("spark.streaming.dynamicAllocation.scalingInterval"); final String DYNAMIC_ALLOCATION_MIN_EXECUTORS = PropertyFileReader.getInstance() .getProperty("spark.streaming.dynamicAllocation.minExecutors"); final String DYNAMIC_ALLOCATION_MAX_EXECUTORS = PropertyFileReader.getInstance() .getProperty("spark.streaming.dynamicAllocation.maxExecutors"); final String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = PropertyFileReader.getInstance() .getProperty("spark.streaming.dynamicAllocation.executorIdleTimeout"); final String DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = PropertyFileReader.getInstance() .getProperty("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout"); final String SPARK_SHUFFLE_SERVICE_ENABLED = PropertyFileReader.getInstance() .getProperty("spark.shuffle.service.enabled"); final String SPARK_LOCALITY_WAIT = PropertyFileReader.getInstance().getProperty("spark.locality.wait"); final String SPARK_KAFKA_CONSUMER_POLL_INTERVAL = PropertyFileReader.getInstance() .getProperty("spark.streaming.kafka.consumer.poll.ms"); final String SPARK_KAFKA_MAX_RATE_PER_PARTITION = PropertyFileReader.getInstance() .getProperty("spark.streaming.kafka.maxRatePerPartition"); final String SPARK_BATCH_DURATION_IN_SECONDS = PropertyFileReader.getInstance() .getProperty("spark.batch.duration.in.seconds"); final String KAFKA_TOPIC = PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.topic"); LOGGER.debug("connecting to brokers ::" + BOOTSTRAP_SERVERS); LOGGER.debug("bootstrapping properties to create consumer"); kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "telemetry-streaming-service"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); kafkaParams.put("client.id","client-0"); // Below property should be enabled in properties and changed based on // performance testing kafkaParams.put("max.poll.records", PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.max.poll.records")); LOGGER.info("registering as a consumer with the topic :: " + KAFKA_TOPIC); topics = Arrays.asList(KAFKA_TOPIC); sparkConf = new SparkConf() // .setMaster(PropertyFileReader.getInstance().getProperty("spark.master.url")) .setAppName(PropertyFileReader.getInstance().getProperty("spark.application.name")) .set("spark.streaming.dynamicAllocation.enabled", DYNAMIC_ALLOCATION_ENABLED) .set("spark.streaming.dynamicAllocation.scalingInterval", DYNAMIC_ALLOCATION_SCALING_INTERVAL) .set("spark.streaming.dynamicAllocation.minExecutors", DYNAMIC_ALLOCATION_MIN_EXECUTORS) .set("spark.streaming.dynamicAllocation.maxExecutors", DYNAMIC_ALLOCATION_MAX_EXECUTORS) .set("spark.streaming.dynamicAllocation.executorIdleTimeout", DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) .set("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout", DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT) .set("spark.shuffle.service.enabled", SPARK_SHUFFLE_SERVICE_ENABLED) .set("spark.locality.wait", SPARK_LOCALITY_WAIT) .set("spark.streaming.kafka.consumer.poll.ms", SPARK_KAFKA_CONSUMER_POLL_INTERVAL) .set("spark.streaming.kafka.maxRatePerPartition", SPARK_KAFKA_MAX_RATE_PER_PARTITION); LOGGER.debug("creating streaming context with minutes batch interval ::: " + SPARK_BATCH_DURATION_IN_SECONDS); streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(Integer.parseInt(SPARK_BATCH_DURATION_IN_SECONDS))); /* * todo: add checkpointing to the streaming context to recover from driver * failures and also for offset management */ LOGGER.info("checkpointing the streaming transactions at hdfs path :: /checkpoint"); streamingContext.checkpoint("/checkpoint"); streamingContext.addStreamingListener(new DataProcessingListener()); } @Override public void execute() throws InterruptedException { LOGGER.info("started telemetry pipeline executor to consume data"); // Data Consume from the Kafka topic JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams)); telemetryStream.foreachRDD(rawRDD -> { if (!rawRDD.isEmpty()) { OffsetRange[] offsetRanges = ((HasOffsetRanges) rawRDD.rdd()).offsetRanges(); LOGGER.debug("list of OffsetRanges getting processed as a string :: " + Arrays.asList(offsetRanges).toString()); System.out.println("offsetRanges : " + offsetRanges.length); SparkSession spark = JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf()); JavaPairRDD<String, String> flattenedRawRDD = rawRDD.mapToPair(record -> { //LOGGER.debug("flattening JSON record with telemetry json value ::: " + record.value()); ObjectMapper om = new ObjectMapper(); JsonNode root = om.readTree(record.value()); Map<String, JsonNode> flattenedMap = new FlatJsonGenerator(root).flatten(); JsonNode flattenedRootNode = om.convertValue(flattenedMap, JsonNode.class); //LOGGER.debug("creating Tuple for the JSON record Key :: " + flattenedRootNode.get("/name").asText() // + ", value :: " + flattenedRootNode.toString()); return new Tuple2<String, String>(flattenedRootNode.get("/name").asText(), flattenedRootNode.toString()); }); Dataset<Row> rawFlattenedDataRDD = spark .createDataset(flattenedRawRDD.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .toDF("sensor_path", "sensor_data"); Dataset<Row> groupedDS = rawFlattenedDataRDD.groupBy(col("sensor_path")) .agg(collect_list(col("sensor_data").as("sensor_data"))); Dataset<Row> lldpGroupedDS = groupedDS.filter((FilterFunction<Row>) r -> r.getString(0).equals("Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device")); LOGGER.info("printing the LLDP GROUPED DS ------------------>"); lldpGroupedDS.show(2); LOGGER.info("creating telemetry pipeline to process the telemetry data"); HashMap<Object, Object> params = new HashMap<>(); params.put(DPConstants.OTSDB_CONFIG_F_PATH, ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.config.file.path")); params.put(DPConstants.OTSDB_CLIENT_TYPE, ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.client.type")); try { LOGGER.info("<-------------------processing lldp data and write to hive STARTED ----------------->"); Pipeline lldpPipeline = PipelineFactory.getPipeline(PipelineType.LLDPTELEMETRY); lldpPipeline.process(lldpGroupedDS, null); LOGGER.info("<-------------------processing lldp data and write to hive COMPLETED ----------------->"); LOGGER.info("<-------------------processing groupedDS data and write to OPENTSDB STARTED ----------------->"); Pipeline pipeline = PipelineFactory.getPipeline(PipelineType.TELEMETRY); pipeline.process(groupedDS, params); LOGGER.info("<-------------------processing groupedDS data and write to OPENTSDB COMPLETED ----------------->"); }catch (Throwable t){ t.printStackTrace(); } LOGGER.info("commiting offsets after processing the batch"); ((CanCommitOffsets) telemetryStream.inputDStream()).commitAsync(offsetRanges); } }); streamingContext.start(); streamingContext.awaitTermination(); }{code} > Spark Streaming dynamic executors override or take default kafka parameters > in cluster mode > ------------------------------------------------------------------------------------------- > > Key: SPARK-30522 > URL: https://issues.apache.org/jira/browse/SPARK-30522 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 2.3.2 > Reporter: phanikumar > Priority: Major > > I have written a spark streaming consumer to consume the data from Kafka. I > found a weird behavior in my logs. The Kafka topic has 3 partitions and for > each partition, an executor is launched by Spark Streaming job.I have written > a spark streaming consumer to consume the data from Kafka. I found a weird > behavior in my logs. The Kafka topic has 3 partitions and for each partition, > an executor is launched by Spark Streaming job. > The first executor id always takes the parameters I have provided while > creating the streaming context but the executor with ID 2 and 3 always > override the kafka parameters. > > {code:java} > 20/01/14 12:15:05 WARN StreamingContext: Dynamic Allocation is enabled for > this application. Enabling Dynamic allocation for Spark Streaming > applications can cause data loss if Write Ahead Log is not enabled for > non-replayable sour ces like Flume. See the programming guide for details > on how to enable the Write Ahead Log. > 20/01/14 12:15:05 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered > 2 write ahead log files from > hdfs://tlabnamenode/checkpoint/receivedBlockMetadata > 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Slide time = 5000 ms > 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Storage level = Serialized 1x > Replicated 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Checkpoint > interval = null > 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Remember interval = 5000 ms > > 20/01/14 12:15:05 INFO DirectKafkaInputDStream: Initialized and validated > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@12665f3f > 20/01/14 12:15:05 INFO ForEachDStream: Slide time = 5000 ms > 20/01/14 12:15:05 INFO ForEachDStream: Storage level = Serialized 1x > Replicated 20/01/14 12:15:05 INFO ForEachDStream: Checkpoint interval = > null > 20/01/14 12:15:05 INFO ForEachDStream: Remember interval = 5000 ms > 20/01/14 12:15:05 INFO ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@a4d83ac > 20/01/14 12:15:05 INFO ConsumerConfig: ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [1,2,3] > check.crcs = true > client.id = client-0 > connections.max.idle.ms = 540000 > default.api.timeout.ms = 60000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = telemetry-streaming-service > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > > {code} > Here is the log for other executors. > > {code:java} > 20/01/14 12:15:04 INFO Executor: Starting executor ID 2 on host 1 > 20/01/14 12:15:04 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40324. > 20/01/14 12:15:04 INFO NettyBlockTransferService: Server created on 1 > 20/01/14 12:15:04 INFO BlockManager: Using > org.apache.spark.storage.RandomBlockReplicationPolicy for block replication > policy 20/01/14 12:15:04 INFO BlockManagerMaster: Registering BlockManager > BlockManagerId(2, matrix-hwork-data-05, 40324, None) > 20/01/14 12:15:04 INFO BlockManagerMaster: Registered BlockManager > BlockManagerId(2, matrix-hwork-data-05, 40324, None) > 20/01/14 12:15:04 INFO BlockManager: external shuffle service port = 7447 > 20/01/14 12:15:04 INFO BlockManager: Registering executor with local external > shuffle service. > 20/01/14 12:15:04 INFO TransportClientFactory: Successfully created > connection to matrix-hwork-data-05/10.83.34.25:7447 after 1 ms (0 ms spent in > bootstraps) > 20/01/14 12:15:04 INFO BlockManager: Initialized BlockManager: > BlockManagerId(2, matrix-hwork-data-05, 40324, None) > 20/01/14 12:15:19 INFO CoarseGrainedExecutorBackend: Got assigned task 1 > 20/01/14 12:15:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > 20/01/14 12:15:19 INFO TorrentBroadcast: Started reading broadcast variable 0 > > 20/01/14 12:15:19 INFO TransportClientFactory: Successfully created > connection to matrix-hwork-data-05/10.83.34.25:38759 after 2 ms (0 ms spent > in bootstraps) > 20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 8.1 KB, free 6.2 GB) > 20/01/14 12:15:20 INFO TorrentBroadcast: Reading broadcast variable 0 took > 163 ms 20/01/14 12:15:20 INFO MemoryStore: Block broadcast_0 stored as > values in memory (estimated size 17.9 KB, free 6.2 GB) > 20/01/14 12:15:20 INFO KafkaRDD: Computing topic telemetry, partition 1 > offsets 237352170 -> 237352311 20/01/14 12:15:20 INFO CachedKafkaConsumer: > Initializing cache 16 64 0.75 20/01/14 12:15:20 INFO CachedKafkaConsumer: > Cache miss for > CacheKey(spark-executor-telemetry-streaming-service,telemetry,1) > 20/01/14 12:15:20 INFO ConsumerConfig: ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = none > bootstrap.servers = [1,2,3] > check.crcs = true > client.id = client-0 > connections.max.idle.ms = 540000 > default.api.timeout.ms = 60000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > {code} > > If we closely observer in the first executor the **auto.offset.reset is > latest** but for the other executors the **auto.offset.reset = none** > > Here is how I am creating the streaming context > > {code:java} > public void init() throws Exception { > final String BOOTSTRAP_SERVERS = > PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.broker.list");final > String DYNAMIC_ALLOCATION_ENABLED = > PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.enabled");final > String DYNAMIC_ALLOCATION_SCALING_INTERVAL = > PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.scalingInterval");final > String DYNAMIC_ALLOCATION_MIN_EXECUTORS = > PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.minExecutors");final > String DYNAMIC_ALLOCATION_MAX_EXECUTORS = > PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.maxExecutors");final > String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = > PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.executorIdleTimeout");final > String DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = > PropertyFileReader.getInstance().getProperty("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout");final > String SPARK_SHUFFLE_SERVICE_ENABLED = > PropertyFileReader.getInstance().getProperty("spark.shuffle.service.enabled");final > String SPARK_LOCALITY_WAIT = > PropertyFileReader.getInstance().getProperty("spark.locality.wait");final > String SPARK_KAFKA_CONSUMER_POLL_INTERVAL = > PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.consumer.poll.ms");final > String SPARK_KAFKA_MAX_RATE_PER_PARTITION = > PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.maxRatePerPartition");final > String SPARK_BATCH_DURATION_IN_SECONDS = > PropertyFileReader.getInstance().getProperty("spark.batch.duration.in.seconds");final > String KAFKA_TOPIC = > PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.topic"); > kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", > BOOTSTRAP_SERVERS);kafkaParams.put("key.deserializer", > StringDeserializer.class);kafkaParams.put("value.deserializer", > StringDeserializer.class);kafkaParams.put("group.id", > "telemetry-streaming-service");kafkaParams.put("auto.offset.reset", > "latest");kafkaParams.put("enable.auto.commit", > false);kafkaParams.put("client.id","client-0");kafkaParams.put("max.poll.records",PropertyFileReader.getInstance().getProperty("spark.streaming.kafka.max.poll.records")); > topics = Arrays.asList(KAFKA_TOPIC);sparkConf = new > SparkConf()//.setMaster(PropertyFileReader.getInstance().getProperty("spark.master.url")).setAppName(PropertyFileReader.getInstance().getProperty("spark.application.name")).set("spark.streaming.dynamicAllocation.enabled", > > DYNAMIC_ALLOCATION_ENABLED).set("spark.streaming.dynamicAllocation.scalingInterval", > > DYNAMIC_ALLOCATION_SCALING_INTERVAL).set("spark.streaming.dynamicAllocation.minExecutors", > > DYNAMIC_ALLOCATION_MIN_EXECUTORS).set("spark.streaming.dynamicAllocation.maxExecutors", > > DYNAMIC_ALLOCATION_MAX_EXECUTORS).set("spark.streaming.dynamicAllocation.executorIdleTimeout", > > DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT).set("spark.streaming.dynamicAllocation.cachedExecutorIdleTimeout",DYNAMIC_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT).set("spark.shuffle.service.enabled", > SPARK_SHUFFLE_SERVICE_ENABLED).set("spark.locality.wait", > SPARK_LOCALITY_WAIT).set("spark.streaming.kafka.consumer.poll.ms", > SPARK_KAFKA_CONSUMER_POLL_INTERVAL).set("spark.streaming.kafka.maxRatePerPartition", > SPARK_KAFKA_MAX_RATE_PER_PARTITION); > streamingContext = new > JavaStreamingContext(sparkConf,Durations.seconds(Integer.parseInt(SPARK_BATCH_DURATION_IN_SECONDS)));streamingContext.checkpoint("/checkpoint");streamingContext.addStreamingListener(new > DataProcessingListener());} > @Overridepublic void execute() throws InterruptedException > {JavaInputDStream<ConsumerRecord<String, String>> telemetryStream = > KafkaUtils.createDirectStream(streamingContext, > LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, > kafkaParams)); > telemetryStream.foreachRDD(rawRDD -> {if (!rawRDD.isEmpty()) {OffsetRange[] > offsetRanges = ((HasOffsetRanges) > rawRDD.rdd()).offsetRanges();LOGGER.debug("list of OffsetRanges getting > processed as a string :: "+ > Arrays.asList(offsetRanges).toString());System.out.println("offsetRanges : " > + offsetRanges.length);SparkSession spark = > JavaSparkSessionSingleton.getInstance(rawRDD.context().getConf());JavaPairRDD<String, > String> flattenedRawRDD = rawRDD.mapToPair(record -> {ObjectMapper om = new > ObjectMapper();JsonNode root = om.readTree(record.value());Map<String, > JsonNode> flattenedMap = new FlatJsonGenerator(root).flatten();JsonNode > flattenedRootNode = om.convertValue(flattenedMap, JsonNode.class);return new > Tuple2<String, > String>(flattenedRootNode.get("/name").asText(),flattenedRootNode.toString());}); > Dataset<Row> rawFlattenedDataRDD = spark.createDataset(flattenedRawRDD.rdd(), > Encoders.tuple(Encoders.STRING(), Encoders.STRING())).toDF("sensor_path", > "sensor_data");Dataset<Row> groupedDS = > rawFlattenedDataRDD.groupBy(col("sensor_path")).agg(collect_list(col("sensor_data").as("sensor_data"))); > Dataset<Row> lldpGroupedDS = groupedDS.filter((FilterFunction<Row>) r -> > r.getString(0).equals("Cisco-IOS-XR-ethernet-lldp-oper:lldp/nodes/node/neighbors/devices/device")); > lldpGroupedDS.show(2); > HashMap<Object, Object> params = new > HashMap<>();params.put(DPConstants.OTSDB_CONFIG_F_PATH, > ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.config.file.path"));params.put(DPConstants.OTSDB_CLIENT_TYPE, > > ExternalizedConfigsReader.getPropertyValueFromCache("/opentsdb.client.type")); > try {Pipeline lldpPipeline = > PipelineFactory.getPipeline(PipelineType.LLDPTELEMETRY);lldpPipeline.process(lldpGroupedDS, > null); > Pipeline pipeline = > PipelineFactory.getPipeline(PipelineType.TELEMETRY);pipeline.process(groupedDS, > params); > }catch (Throwable t){t.printStackTrace();}((CanCommitOffsets) > telemetryStream.inputDStream()).commitAsync(offsetRanges); > }}); > streamingContext.start();streamingContext.awaitTermination();{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org