Arindam Ray created KAFKA-12346: ----------------------------------- Summary: punctuate is called at twice the duration passed as the first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME) Key: KAFKA-12346 URL: https://issues.apache.org/jira/browse/KAFKA-12346 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.7.0 Reporter: Arindam Ray
A stream transform called with the idiom below causes punctuate to be called at twice the duration of the argument passed {code:java} .transform(new TransformerSupplier[String, TimeStampedString, KeyValue[String, TimeStampedString]]() { override def get(): Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] = new Transformer[String, TimeStampedString, KeyValue[String, TimeStampedString]] { override def init(context: ProcessorContext): Unit = { val store = context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String, ValueAndTimestamp[TimeStampedString]]] context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new Punctuator { override def punctuate(timestamp: Long): Unit = { logger.info(s"Punctuate invoked with timestamp : ${Instant.ofEpochMilli(timestamp)}") } } ) } override def transform(key: String, value: TimeStampedString): KeyValue[String, TimeStampedString] = { // no need to return anything here, the Punctuator will emit the records when necessary null } override def close(): Unit = {} } }, /** * register that this Transformer needs to be connected to our state store. */ stateStoreName ) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)