[ 
https://issues.apache.org/jira/browse/SPARK-40188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Madhav Madhu updated SPARK-40188:
---------------------------------
    Description: 
Spark Kafka consumer is unable to read messages, of a certain size or count in 
batches. I have tried few approaches as mentioned in Kafka docs but with no 
success. Here is a link to Stack Overflow where I asked the same question with 
no response and think this is a possible bug here. Same configuration works 
fine when the consumer is a java code.
https://stackoverflow.com/questions/73398533/spark-streaming-context-kafka-consumer-read-messages-of-a-certain-byte-size-in

Here is the consumer code which fetches data from Kafka,


{code:scala}
val streamingContext = new StreamingContext(sparkSession.sparkContext, 
Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test",
"fetch.max.bytes" -> "65536",
"max.partition.fetch.bytes" -> "8192",
"max.poll.records" -> "100",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"sasl.jaas.config"-> "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=\"admin\" password=\"admin\";",
"sasl.mechanism" -> "PLAIN",
"security.protocol" -> "SASL_PLAINTEXT",
  )

val topics = Array("test.topic") 
val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD {
rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  println(offsetRanges.foreach(a => println(a.topic + ":" + a.partition + ":" + 
a.fromOffset + ":" + a.untilOffset + ":" + a.count())))

  val df = rdd.map(a => a.value().split(",")).toDF()
  val selectCols = columns.indices.map(i => $"value"(i))
  var newDF = df.select(selectCols: _*).toDF(columns: _*)

  // Some business operations here and then write to back to kafka.
  
  newDF.write
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "topic.ouput")
    .option("kafka.sasl.jaas.config", 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"admin\" password=\"admin\";")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_PLAINTEXT")
    .save()

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

  sparkSession.catalog.clearCache()
}

streamingContext.start()
streamingContext.awaitTermination()

{code}

Output:

{code:java}

test.topic:6:1345075:4163058:2817983
test.topic:0:1339456:4144190:2804734
test.topic:3:1354266:4189336:2835070
test.topic:7:1353542:4186148:2832606
test.topic:5:1355140:4189071:2833931
test.topic:2:1351162:4173375:2822213
test.topic:1:1352801:4184073:2831272
test.topic:4:1348558:4166749:2818191
()
test.topic:6:4163058:4163058:0
test.topic:0:4144190:4144190:0
test.topic:3:4189336:4189336:0
test.topic:7:4186148:4186148:0
test.topic:5:4189071:4189071:0
test.topic:2:4173375:4173375:0
test.topic:1:4184073:4184073:0
test.topic:4:4166749:4166749:0
{code}

I tried different options as followed,
Option 1:

Topic Partition 8
Streaming Context 1 sec:
"fetch.max.bytes" -> "65536", // 64 Kb
"max.partition.fetch.bytes" -> "8192" // 8Kb
"max.poll.records" -> "100"

DataFrame count which it read from Kafka in the very first batch: 1200000

Option 2:
Partition 1
Streaming Context 1 sec
"fetch.max.bytes" -> "65536",
"max.partition.fetch.bytes" -> "8192"
"max.poll.records" -> "100"

Kafka Lag: 126360469
DataFrame count which it read from Kafka in the very first batch: 126360469.
 

  was:
Spark Kafka consumer is unable to read messages, of a certain size or count in 
batches. I have tried few approaches as mentioned in Kafka docs but with no 
success. Here is a link to Stack Overflow where I asked the same question with 
no response and think this is a possible bug here. Same configuration works 
fine when the consumer is a java code.
https://stackoverflow.com/questions/73398533/spark-streaming-context-kafka-consumer-read-messages-of-a-certain-byte-size-in

Here is the consumer code which fetches data from Kafka,


{code:scala}
val streamingContext = new StreamingContext(sparkSession.sparkContext, 
Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test",
"fetch.max.bytes" -> "65536",
"max.partition.fetch.bytes" -> "8192",
"max.poll.records" -> "100",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"sasl.jaas.config"-> "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=\"admin\" password=\"admin\";",
"sasl.mechanism" -> "PLAIN",
"security.protocol" -> "SASL_PLAINTEXT",
  )

val topics = Array("test.topic") 
val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
)

stream.foreachRDD {
rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  println(offsetRanges.foreach(a => println(a.topic + ":" + a.partition + ":" + 
a.fromOffset + ":" + a.untilOffset + ":" + a.count())))

  val df = rdd.map(a => a.value().split(",")).toDF()
  val selectCols = columns.indices.map(i => $"value"(i))
  var newDF = df.select(selectCols: _*).toDF(columns: _*)

  // Some business operations here and then write to back to kafka.
  
  newDF.write
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "topic.ouput")
    .option("kafka.sasl.jaas.config", 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"admin\" password=\"admin\";")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_PLAINTEXT")
    .save()

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

  sparkSession.catalog.clearCache()
}

streamingContext.start()
streamingContext.awaitTermination()

{code}

Output:

{code:java}

test.topic:6:1345075:4163058:2817983
test.topic:0:1339456:4144190:2804734
test.topic:3:1354266:4189336:2835070
test.topic:7:1353542:4186148:2832606
test.topic:5:1355140:4189071:2833931
test.topic:2:1351162:4173375:2822213
test.topic:1:1352801:4184073:2831272
test.topic:4:1348558:4166749:2818191
()
test.topic:6:4163058:4163058:0
test.topic:0:4144190:4144190:0
test.topic:3:4189336:4189336:0
test.topic:7:4186148:4186148:0
test.topic:5:4189071:4189071:0
test.topic:2:4173375:4173375:0
test.topic:1:4184073:4184073:0
test.topic:4:4166749:4166749:0
{code}


 


> Spark Direct Streaming: Read messages of a certain bytes or count in batches 
> from Kafka is not working.
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-40188
>                 URL: https://issues.apache.org/jira/browse/SPARK-40188
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 3.2.1
>         Environment: Spark Version: 3.2.1
> Kafka version: 3.2.0
>  
>            Reporter: Madhav Madhu
>            Priority: Major
>
> Spark Kafka consumer is unable to read messages, of a certain size or count 
> in batches. I have tried few approaches as mentioned in Kafka docs but with 
> no success. Here is a link to Stack Overflow where I asked the same question 
> with no response and think this is a possible bug here. Same configuration 
> works fine when the consumer is a java code.
> https://stackoverflow.com/questions/73398533/spark-streaming-context-kafka-consumer-read-messages-of-a-certain-byte-size-in
> Here is the consumer code which fetches data from Kafka,
> {code:scala}
> val streamingContext = new StreamingContext(sparkSession.sparkContext, 
> Seconds(10))
> val kafkaParams = Map[String, Object](
> "bootstrap.servers" -> "localhost:9092",
> "key.deserializer" -> classOf[StringDeserializer],
> "value.deserializer" -> classOf[StringDeserializer],
> "group.id" -> "test",
> "fetch.max.bytes" -> "65536",
> "max.partition.fetch.bytes" -> "8192",
> "max.poll.records" -> "100",
> "auto.offset.reset" -> "latest",
> "enable.auto.commit" -> (false: java.lang.Boolean),
> "sasl.jaas.config"-> "org.apache.kafka.common.security.plain.PlainLoginModule 
> required username=\"admin\" password=\"admin\";",
> "sasl.mechanism" -> "PLAIN",
> "security.protocol" -> "SASL_PLAINTEXT",
>   )
> val topics = Array("test.topic") 
> val stream = KafkaUtils.createDirectStream[String, String](
>     streamingContext,
>     PreferConsistent,
>     Subscribe[String, String](topics, kafkaParams)
> )
> stream.foreachRDD {
> rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   println(offsetRanges.foreach(a => println(a.topic + ":" + a.partition + ":" 
> + a.fromOffset + ":" + a.untilOffset + ":" + a.count())))
>   val df = rdd.map(a => a.value().split(",")).toDF()
>   val selectCols = columns.indices.map(i => $"value"(i))
>   var newDF = df.select(selectCols: _*).toDF(columns: _*)
>   // Some business operations here and then write to back to kafka.
>   
>   newDF.write
>     .format("kafka")
>     .option("kafka.bootstrap.servers", "localhost:9092")
>     .option("topic", "topic.ouput")
>     .option("kafka.sasl.jaas.config", 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";")
>     .option("kafka.sasl.mechanism", "PLAIN")
>     .option("kafka.security.protocol", "SASL_PLAINTEXT")
>     .save()
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>   sparkSession.catalog.clearCache()
> }
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> Output:
> {code:java}
> test.topic:6:1345075:4163058:2817983
> test.topic:0:1339456:4144190:2804734
> test.topic:3:1354266:4189336:2835070
> test.topic:7:1353542:4186148:2832606
> test.topic:5:1355140:4189071:2833931
> test.topic:2:1351162:4173375:2822213
> test.topic:1:1352801:4184073:2831272
> test.topic:4:1348558:4166749:2818191
> ()
> test.topic:6:4163058:4163058:0
> test.topic:0:4144190:4144190:0
> test.topic:3:4189336:4189336:0
> test.topic:7:4186148:4186148:0
> test.topic:5:4189071:4189071:0
> test.topic:2:4173375:4173375:0
> test.topic:1:4184073:4184073:0
> test.topic:4:4166749:4166749:0
> {code}
> I tried different options as followed,
> Option 1:
> Topic Partition 8
> Streaming Context 1 sec:
> "fetch.max.bytes" -> "65536", // 64 Kb
> "max.partition.fetch.bytes" -> "8192" // 8Kb
> "max.poll.records" -> "100"
> DataFrame count which it read from Kafka in the very first batch: 1200000
> Option 2:
> Partition 1
> Streaming Context 1 sec
> "fetch.max.bytes" -> "65536",
> "max.partition.fetch.bytes" -> "8192"
> "max.poll.records" -> "100"
> Kafka Lag: 126360469
> DataFrame count which it read from Kafka in the very first batch: 126360469.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to