[ https://issues.apache.org/jira/browse/FLINK-27736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17540793#comment-17540793 ]
Martijn Visser commented on FLINK-27736: ---------------------------------------- [~ana4] I've downgraded this in accordance with https://cwiki.apache.org/confluence/display/FLINK/Flink+Jira+Process > Pulsar sink catch watermark error > --------------------------------- > > Key: FLINK-27736 > URL: https://issues.apache.org/jira/browse/FLINK-27736 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Pulsar > Affects Versions: 1.15.0 > Reporter: LuNng Wang > Priority: Critical > > The following is my demo code. > {code:java} > public class WatermarkDemo { > private final static String SERVICE_URL = "pulsar://localhost:6650"; > private final static String ADMIN_URL = "http://localhost:8080"; > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > PulsarSource<String> source = PulsarSource.builder() > .setServiceUrl(SERVICE_URL) > .setAdminUrl(ADMIN_URL) > .setStartCursor(StartCursor.earliest()) > .setTopics("ada") > > .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new > SimpleStringSchema())) > .setSubscriptionName("my-subscription") > .setSubscriptionType(SubscriptionType.Exclusive) > .build(); > PulsarSink<String> sink = PulsarSink.builder() > .setServiceUrl(SERVICE_URL) > .setAdminUrl(ADMIN_URL) > .setTopics("beta") > > .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new > SimpleStringSchema())) > .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build(); > DataStream stream = env.fromSource(source, > WatermarkStrategy.forMonotonousTimestamps(), "Pulsar Source"); > stream.sinkTo(sink); > env.execute(); > } > } {code} > It will throw the following error. > {code:java} > Caused by: java.lang.IllegalArgumentException: Invalid timestamp : '0' > at > org.apache.pulsar.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:203) > at > org.apache.pulsar.client.impl.TypedMessageBuilderImpl.eventTime(TypedMessageBuilderImpl.java:204) > at > org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.createMessageBuilder(PulsarWriter.java:216) > at > org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:141) > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) > at > org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) > at > org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41) > at > org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) > at > org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)