Re: Spark Streaming fails with unable to get records after polling for 512 ms
I don't see anything obvious, you'd need to do more troubleshooting. Could also try creating a single rdd for a known range of offsets: http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-an-rdd On Wed, Nov 15, 2017 at 9:33 PM, jagadish kagitala wrote: > Hi Cody, > > It worked, after moving the parameter to sparkConf. I don't see that error. > But, Now i'm seeing the count for each RDD returns 0. But, there are records > in the topic i'm reading. > > Do you see anything wrong with how i'm creating the Direct Stream ? > > Thanks > Jagadish > > On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger wrote: >> >> spark.streaming.kafka.consumer.poll.ms is a spark configuration, not >> a kafka parameter. >> >> see http://spark.apache.org/docs/latest/configuration.html >> >> On Tue, Nov 14, 2017 at 8:56 PM, jkagitala wrote: >> > Hi, >> > >> > I'm trying to add spark-streaming to our kafka topic. But, I keep >> > getting >> > this error >> > java.lang.AssertionError: assertion failed: Failed to get record after >> > polling for 512 ms. >> > >> > I tried to add different params like max.poll.interval.ms, >> > spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams. >> > But, i still get failed to get records after 512ms. Not sure, even >> > adding >> > the above params doesn't change the polling time. >> > >> > Without spark-streaming, i'm able to fetch the records. Only with >> > spark-streaming addon, i get this error. >> > >> > Any help is greatly appreciated. Below, is the code i'm using. >> > >> > SparkConf sparkConf = new >> > >> > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]"); >> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, >> > Durations.seconds(10)); >> > >> > kafkaParams.put("bootstrap.servers", hosts); >> > kafkaParams.put("group.id", groupid); >> > kafkaParams.put("auto.commit.enable", false); >> > kafkaParams.put("key.deserializer", StringDeserializer.class); >> > kafkaParams.put("value.deserializer", BytesDeserializer.class); >> > kafkaParams.put("auto.offset.reset", "earliest"); >> > //kafkaParams.put("max.poll.interval.ms", 12000); >> > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); >> > //kafkaParams.put("request.timeout.ms", 12000); >> > >> > >> > JavaInputDStream>> messages = >> > KafkaUtils.createDirectStream(ssc, >> > LocationStrategies.PreferConsistent(), >> > >> > ConsumerStrategies.Subscribe(topics, kafkaParams)); >> > messages.foreachRDD(rdd -> { >> > List>> input = >> > rdd.collect(); >> > System.out.println("count is"+input.size()); >> > }); >> > ssc.start(); >> > ssc.awaitTermination(); >> > >> > Thanks >> > Jagadish >> > >> > >> > >> > -- >> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> > >> > - >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming fails with unable to get records after polling for 512 ms
Hi Cody, It worked, after moving the parameter to sparkConf. I don't see that error. But, Now i'm seeing the count for each RDD returns 0. But, there are records in the topic i'm reading. Do you see anything wrong with how i'm creating the Direct Stream ? Thanks Jagadish On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger wrote: > spark.streaming.kafka.consumer.poll.ms is a spark configuration, not > a kafka parameter. > > see http://spark.apache.org/docs/latest/configuration.html > > On Tue, Nov 14, 2017 at 8:56 PM, jkagitala wrote: > > Hi, > > > > I'm trying to add spark-streaming to our kafka topic. But, I keep getting > > this error > > java.lang.AssertionError: assertion failed: Failed to get record after > > polling for 512 ms. > > > > I tried to add different params like max.poll.interval.ms, > > spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams. > > But, i still get failed to get records after 512ms. Not sure, even adding > > the above params doesn't change the polling time. > > > > Without spark-streaming, i'm able to fetch the records. Only with > > spark-streaming addon, i get this error. > > > > Any help is greatly appreciated. Below, is the code i'm using. > > > > SparkConf sparkConf = new > > SparkConf().setAppName("JavaFlingerSparkApplication"). > setMaster("local[*]"); > > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > > Durations.seconds(10)); > > > > kafkaParams.put("bootstrap.servers", hosts); > > kafkaParams.put("group.id", groupid); > > kafkaParams.put("auto.commit.enable", false); > > kafkaParams.put("key.deserializer", StringDeserializer.class); > > kafkaParams.put("value.deserializer", BytesDeserializer.class); > > kafkaParams.put("auto.offset.reset", "earliest"); > > //kafkaParams.put("max.poll.interval.ms", 12000); > > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); > > //kafkaParams.put("request.timeout.ms", 12000); > > > > > > JavaInputDStream>> messages = > > KafkaUtils.createDirectStream(ssc, > > LocationStrategies.PreferConsistent(), > > > > ConsumerStrategies.Subscribe(topics, kafkaParams)); > > messages.foreachRDD(rdd -> { > > List>> input = > rdd.collect(); > > System.out.println("count is"+input.size()); > > }); > > ssc.start(); > > ssc.awaitTermination(); > > > > Thanks > > Jagadish > > > > > > > > -- > > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re: Spark Streaming fails with unable to get records after polling for 512 ms
spark.streaming.kafka.consumer.poll.ms is a spark configuration, not a kafka parameter. see http://spark.apache.org/docs/latest/configuration.html On Tue, Nov 14, 2017 at 8:56 PM, jkagitala wrote: > Hi, > > I'm trying to add spark-streaming to our kafka topic. But, I keep getting > this error > java.lang.AssertionError: assertion failed: Failed to get record after > polling for 512 ms. > > I tried to add different params like max.poll.interval.ms, > spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams. > But, i still get failed to get records after 512ms. Not sure, even adding > the above params doesn't change the polling time. > > Without spark-streaming, i'm able to fetch the records. Only with > spark-streaming addon, i get this error. > > Any help is greatly appreciated. Below, is the code i'm using. > > SparkConf sparkConf = new > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]"); > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, > Durations.seconds(10)); > > kafkaParams.put("bootstrap.servers", hosts); > kafkaParams.put("group.id", groupid); > kafkaParams.put("auto.commit.enable", false); > kafkaParams.put("key.deserializer", StringDeserializer.class); > kafkaParams.put("value.deserializer", BytesDeserializer.class); > kafkaParams.put("auto.offset.reset", "earliest"); > //kafkaParams.put("max.poll.interval.ms", 12000); > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); > //kafkaParams.put("request.timeout.ms", 12000); > > > JavaInputDStream>> messages = > KafkaUtils.createDirectStream(ssc, > LocationStrategies.PreferConsistent(), > > ConsumerStrategies.Subscribe(topics, kafkaParams)); > messages.foreachRDD(rdd -> { > List>> input = > rdd.collect(); > System.out.println("count is"+input.size()); > }); > ssc.start(); > ssc.awaitTermination(); > > Thanks > Jagadish > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Streaming fails with unable to get records after polling for 512 ms
Hi, I'm trying to add spark-streaming to our kafka topic. But, I keep getting this error java.lang.AssertionError: assertion failed: Failed to get record after polling for 512 ms. I tried to add different params like max.poll.interval.ms, spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams. But, i still get failed to get records after 512ms. Not sure, even adding the above params doesn't change the polling time. Without spark-streaming, i'm able to fetch the records. Only with spark-streaming addon, i get this error. Any help is greatly appreciated. Below, is the code i'm using. SparkConf sparkConf = new SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(10)); kafkaParams.put("bootstrap.servers", hosts); kafkaParams.put("group.id", groupid); kafkaParams.put("auto.commit.enable", false); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", BytesDeserializer.class); kafkaParams.put("auto.offset.reset", "earliest"); //kafkaParams.put("max.poll.interval.ms", 12000); //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000); //kafkaParams.put("request.timeout.ms", 12000); JavaInputDStream>> messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams)); messages.foreachRDD(rdd -> { List>> input = rdd.collect(); System.out.println("count is"+input.size()); }); ssc.start(); ssc.awaitTermination(); Thanks Jagadish -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org