[ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066927#comment-16066927
 ] 

Nishkam Ravi commented on KAFKA-5528:
-------------------------------------

Hi [~damianguy], IllegalStateException gets thrown if 
AbstractProcessorContext.recordContext is null, which is expected to not be the 
case inside the process() method. Hence the JIRA. This problem only shows up in 
the case of multiple consumers though. 

Here's the config settings for streams:
    val settings = new Properties
    settings.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaServers.asScala.mkString(","))
    settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArray.getClass.getName)
    settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArray.getClass.getName)
    settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
kafkaParallelism.toString)  // kafkaParallelism = 8

This is how we are launching the streams:

      val p: GenericProcessor[T] = new GenericProcessor[T](serDe, decrypt, 
config)
      p.setProcessFunc(process)
      topologyBuilder = new TopologyBuilder()
      topologyBuilder.addSource("SOURCE", byteDe, byteDe, sourceTopic)
        .addProcessor("PROCESS", () => p, "SOURCE")
      stream = new KafkaStreams(topologyBuilder, streamingConfig)
      stream.start()


> Error while reading topic, offset, partition info from process method
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-5528
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5528
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
>     super.init(processorContext) 
>     hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
>     val topic: String = this.context.topic() 
>     partition: Int = this.context.partition() 
>     val offset: Long = this.context.offset() 
>     val timestamp: Long = this.context.timestamp() 
>     // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to