Hi, I'm trying to add spark-streaming to our kafka topic. But, I keep getting this error java.lang.AssertionError: assertion failed: Failed to get record after polling for 512 ms.
I tried to add different params like max.poll.interval.ms, spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams. But, i still get failed to get records after 512ms. Not sure, even adding the above params doesn't change the polling time. Without spark-streaming, i'm able to fetch the records. Only with spark-streaming addon, i get this error. Any help is greatly appreciated. Below, is the code i'm using. SparkConf sparkConf = new SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); kafkaParams.put("bootstrap.servers", hosts); kafkaParams.put("group.id", groupid); kafkaParams.put("auto.commit.enable", false); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", BytesDeserializer.class); kafkaParams.put("auto.offset.reset", "earliest"); //kafkaParams.put("max.poll.interval.ms", 12000); //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); //kafkaParams.put("request.timeout.ms", 12000); JavaInputDStream<ConsumerRecord<String, List<Bytes>>> messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams)); messages.foreachRDD(rdd -> { List<ConsumerRecord<String, List<Bytes>>> input = rdd.collect(); System.out.println("count is"+input.size()); }); ssc.start(); ssc.awaitTermination(); Thanks Jagadish -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org