Hi,

I'm trying to add spark-streaming to our kafka topic. But, I keep getting
this error
java.lang.AssertionError: assertion failed: Failed to get record after
polling for 512 ms.

I tried to add different params like max.poll.interval.ms,
spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams. 
But, i still get failed to get records after 512ms. Not sure, even adding
the above params doesn't change the polling time.

Without spark-streaming, i'm able to fetch the records. Only with
spark-streaming addon, i get this error.

Any help is greatly appreciated. Below, is the code i'm using.

SparkConf sparkConf = new
SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(10));

kafkaParams.put("bootstrap.servers", hosts);
kafkaParams.put("group.id", groupid);
kafkaParams.put("auto.commit.enable", false);           
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", BytesDeserializer.class);
kafkaParams.put("auto.offset.reset", "earliest");
//kafkaParams.put("max.poll.interval.ms", 12000);
//kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);     
//kafkaParams.put("request.timeout.ms", 12000);
                
                
JavaInputDStream<ConsumerRecord&lt;String, List&lt;Bytes>>> messages = 
                          KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(), 
                                                                     
ConsumerStrategies.Subscribe(topics, kafkaParams));
messages.foreachRDD(rdd -> {
                List<ConsumerRecord&lt;String, List&lt;Bytes>>> input = 
rdd.collect();
                System.out.println("count is"+input.size());
        });
ssc.start();
ssc.awaitTermination();

Thanks
Jagadish



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to