[ 
https://issues.apache.org/jira/browse/FLINK-26642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509694#comment-17509694
 ] 

Fabian Paul commented on FLINK-26642:
-------------------------------------

Merged in master: 0e6f33b1ad9607faefd2f8fd7fdf4d62f612f6df

Merged in release-1.15: 3c963cc69368530f651b12b91c16cc157f76e1c7

> Pulsar sink fails with non-partitioned topic
> --------------------------------------------
>
>                 Key: FLINK-26642
>                 URL: https://issues.apache.org/jira/browse/FLINK-26642
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.15.0
>            Reporter: goldenyang
>            Assignee: goldenyang
>            Priority: Major
>              Labels: pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Flink support pulsar sink now in 
> [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I 
> encountered a problem when using pulsar sink in master branch, when I use 
> non-partitioned topic.
> The current test found that both partitioned topics and non-partitioned 
> topics ending with -partition-i can be supported, but ordinary 
> non-partitioned topics without -partition-i will have problems, such as 
> 'test_topic'. 
> Reproducing the problem requires writing to a non-partitioned topic. Below is 
> the stack information when the exception is encountered. I briefly 
> communicated with [~Jianyun Zhao] , this may be a bug. 
>  
> {code:java}
> 2022-03-08 21:39:13,622 - INFO - 
> [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar 
> Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) 
> (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on 
> 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 
> (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics 
> for routing topic by message key hash.at 
> org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at
>  
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at
>  
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at
>  
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at
>  
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at
>  
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at
>  
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at
>  
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at
>  
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at
>  
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at
>  org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at 
> java.lang.Thread.run(Thread.java:748) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to