Hi Cody,

It worked, after moving the parameter to sparkConf. I don't see that error.
But, Now i'm seeing the count for each RDD returns 0. But, there are
records in the topic i'm reading.

Do you see anything wrong with how i'm creating the Direct Stream ?


On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger <c...@koeninger.org> wrote:

> spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
> a kafka parameter.
> see http://spark.apache.org/docs/latest/configuration.html
> On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <jka...@gmail.com> wrote:
> > Hi,
> >
> > I'm trying to add spark-streaming to our kafka topic. But, I keep getting
> > this error
> > java.lang.AssertionError: assertion failed: Failed to get record after
> > polling for 512 ms.
> >
> > I tried to add different params like max.poll.interval.ms,
> > spark.streaming.kafka.consumer.poll.ms to 10000ms in kafkaParams.
> > But, i still get failed to get records after 512ms. Not sure, even adding
> > the above params doesn't change the polling time.
> >
> > Without spark-streaming, i'm able to fetch the records. Only with
> > spark-streaming addon, i get this error.
> >
> > Any help is greatly appreciated. Below, is the code i'm using.
> >
> > SparkConf sparkConf = new
> > SparkConf().setAppName("JavaFlingerSparkApplication").
> setMaster("local[*]");
> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> > Durations.seconds(10));
> >
> > kafkaParams.put("bootstrap.servers", hosts);
> > kafkaParams.put("group.id", groupid);
> > kafkaParams.put("auto.commit.enable", false);
> > kafkaParams.put("key.deserializer", StringDeserializer.class);
> > kafkaParams.put("value.deserializer", BytesDeserializer.class);
> > kafkaParams.put("auto.offset.reset", "earliest");
> > //kafkaParams.put("max.poll.interval.ms", 12000);
> > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
> > //kafkaParams.put("request.timeout.ms", 12000);
> >
> >
> > JavaInputDStream<ConsumerRecord&lt;String, List&lt;Bytes>>> messages =
> >                           KafkaUtils.createDirectStream(ssc,
> > LocationStrategies.PreferConsistent(),
> >
> > ConsumerStrategies.Subscribe(topics, kafkaParams));
> > messages.foreachRDD(rdd -> {
> >                 List<ConsumerRecord&lt;String, List&lt;Bytes>>> input =
> rdd.collect();
> >                 System.out.println("count is"+input.size());
> >         });
> > ssc.start();
> > ssc.awaitTermination();
> >
> > Thanks
> > Jagadish
> >
> >
> >
