Hey everyone, spark streaming consumer kafka010 throw a lot of errors, spark program does not exit, but after running for some time many Executor loss and no error.
spark version: 2.4.3 kafka version: 0.10 org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:447) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:254) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:137) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:307) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189) at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:139) at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:135) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:139) at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:135) val vsRDD = v.mapPartitions(messageIt => { val signalBuffer = new ArrayBuffer[String]() while(messageIt.hasNext){ try { val record = messageIt.next() val message = record.value() val topic = record.topic() val uncompressed = FileUtils.uncompressGz(message) val signal = new String(uncompressed, "UTF-8") val rq = JSON.parseObject(signal) val vehType=rq.getString("VehicleModel") //TODO 只解析as28lite if(vehType!=null){ if(vehType.equals("100021")){ val oem = "saic" val ret = VsParserForSaic.process(oem,rq) signalBuffer +=ret } } } catch { case inte: InterruptedException = > log.error("Publisher thread interrupted. Exception: {}.", record) case ex: Exception => ex.printStackTrace() log.info(ex.getMessage) } } signalBuffer.iterator }) any solution for this issue ? Many Thanks. YunKillEroe