Arindam Ray created KAFKA-12346:
-----------------------------------
Summary: punctuate is called at twice the duration passed as the
first argument to Processor.Schedule (with PunctuationType.WALL_CLOCK_TIME)
Key: KAFKA-12346
URL: https://issues.apache.org/jira/browse/KAFKA-12346
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.7.0
Reporter: Arindam Ray
A stream transform called with the idiom below causes punctuate to be called at
twice the duration of the argument passed
{code:java}
.transform(new TransformerSupplier[String, TimeStampedString,
KeyValue[String, TimeStampedString]]() {
override def get(): Transformer[String, TimeStampedString,
KeyValue[String, TimeStampedString]] = new Transformer[String,
TimeStampedString, KeyValue[String, TimeStampedString]] { override
def init(context: ProcessorContext): Unit = {
val store =
context.getStateStore(stateStoreName).asInstanceOf[KeyValueStore[String,
ValueAndTimestamp[TimeStampedString]]]
context.schedule(scanFrequency,
PunctuationType.WALL_CLOCK_TIME,
new Punctuator {
override def punctuate(timestamp: Long): Unit = {
logger.info(s"Punctuate invoked with timestamp :
${Instant.ofEpochMilli(timestamp)}")
}
}
)
} override def transform(key: String, value:
TimeStampedString): KeyValue[String, TimeStampedString] = {
// no need to return anything here, the Punctuator will emit the
records when necessary
null
} override def close(): Unit = {}
}
}, /**
* register that this Transformer needs to be connected to our state
store.
*/
stateStoreName
)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)