spark.streaming.kafka.consumer.poll.ms is a spark configuration, not a kafka parameter.
see http://spark.apache.org/docs/latest/configuration.html On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <jka...@gmail.com> wrote: > Hi, > > I'm trying to add spark-streaming to our kafka topic. But, I keep getting > this error > java.lang.AssertionError: assertion failed: Failed to get record after > polling for 512 ms. > > I tried to add different params like max.poll.interval.ms, > spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams. > But, i still get failed to get records after 512ms. Not sure, even adding > the above params doesn't change the polling time. > > Without spark-streaming, i'm able to fetch the records. Only with > spark-streaming addon, i get this error. > > Any help is greatly appreciated. Below, is the code i'm using. > > SparkConf sparkConf = new > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]"); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > Durations.seconds(10)); > > kafkaParams.put("bootstrap.servers", hosts); > kafkaParams.put("group.id", groupid); > kafkaParams.put("auto.commit.enable", false); > kafkaParams.put("key.deserializer", StringDeserializer.class); > kafkaParams.put("value.deserializer", BytesDeserializer.class); > kafkaParams.put("auto.offset.reset", "earliest"); > //kafkaParams.put("max.poll.interval.ms", 12000); > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); > //kafkaParams.put("request.timeout.ms", 12000); > > > JavaInputDStream<ConsumerRecord<String, List<Bytes>>> messages = > KafkaUtils.createDirectStream(ssc, > LocationStrategies.PreferConsistent(), > > ConsumerStrategies.Subscribe(topics, kafkaParams)); > messages.foreachRDD(rdd -> { > List<ConsumerRecord<String, List<Bytes>>> input = > rdd.collect(); > System.out.println("count is"+input.size()); > }); > ssc.start(); > ssc.awaitTermination(); > > Thanks > Jagadish > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org