[jira] [Commented] (FLINK-26642) Pulsar sink fails with non-partitioned topic

2022-03-21 Thread Fabian Paul (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509694#comment-17509694
 ] 

Fabian Paul commented on FLINK-26642:
-

Merged in master: 0e6f33b1ad9607faefd2f8fd7fdf4d62f612f6df

Merged in release-1.15: 3c963cc69368530f651b12b91c16cc157f76e1c7

> Pulsar sink fails with non-partitioned topic
> 
>
> Key: FLINK-26642
> URL: https://issues.apache.org/jira/browse/FLINK-26642
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: goldenyang
>Assignee: goldenyang
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Flink support pulsar sink now in 
> [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I 
> encountered a problem when using pulsar sink in master branch, when I use 
> non-partitioned topic.
> The current test found that both partitioned topics and non-partitioned 
> topics ending with -partition-i can be supported, but ordinary 
> non-partitioned topics without -partition-i will have problems, such as 
> 'test_topic'. 
> Reproducing the problem requires writing to a non-partitioned topic. Below is 
> the stack information when the exception is encountered. I briefly 
> communicated with [~Jianyun Zhao] , this may be a bug. 
>  
> {code:java}
> 2022-03-08 21:39:13,622 - INFO - 
> [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar 
> Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) 
> (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on 
> 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 
> (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics 
> for routing topic by message key hash.at 
> org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at
>  
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at
>  
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at
>  
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at
>  
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at
>  
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at
>  
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at
>  
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at
>  
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at
>  
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at
>  org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at 
> java.lang.Thread.run(Thread.java:748) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26642) Pulsar sink fails with non-partitioned topic

2022-03-17 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508116#comment-17508116
 ] 

Martijn Visser commented on FLINK-26642:


[~goldenyang] I've assigned it to you

> Pulsar sink fails with non-partitioned topic
> 
>
> Key: FLINK-26642
> URL: https://issues.apache.org/jira/browse/FLINK-26642
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: goldenyang
>Assignee: goldenyang
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Flink support pulsar sink now in 
> [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I 
> encountered a problem when using pulsar sink in master branch, when I use 
> non-partitioned topic.
> The current test found that both partitioned topics and non-partitioned 
> topics ending with -partition-i can be supported, but ordinary 
> non-partitioned topics without -partition-i will have problems, such as 
> 'test_topic'. 
> Reproducing the problem requires writing to a non-partitioned topic. Below is 
> the stack information when the exception is encountered. I briefly 
> communicated with [~Jianyun Zhao] , this may be a bug. 
>  
> {code:java}
> 2022-03-08 21:39:13,622 - INFO - 
> [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar 
> Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) 
> (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on 
> 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 
> (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics 
> for routing topic by message key hash.at 
> org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at
>  
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at
>  
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at
>  
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at
>  
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at
>  
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at
>  
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at
>  
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at
>  
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at
>  
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at
>  org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at 
> java.lang.Thread.run(Thread.java:748) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26642) Pulsar sink fails with non-partitioned topic

2022-03-17 Thread goldenyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508115#comment-17508115
 ] 

goldenyang commented on FLINK-26642:


[~Jianyun Zhao] [~arvid]  [~martijnvisser]   
Hello, how are you doing?
I'm ready to fix this, could you assign this issue to me. Thank you very much~

> Pulsar sink fails with non-partitioned topic
> 
>
> Key: FLINK-26642
> URL: https://issues.apache.org/jira/browse/FLINK-26642
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: goldenyang
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Flink support pulsar sink now in 
> [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I 
> encountered a problem when using pulsar sink in master branch, when I use 
> non-partitioned topic.
> The current test found that both partitioned topics and non-partitioned 
> topics ending with -partition-i can be supported, but ordinary 
> non-partitioned topics without -partition-i will have problems, such as 
> 'test_topic'. 
> Reproducing the problem requires writing to a non-partitioned topic. Below is 
> the stack information when the exception is encountered. I briefly 
> communicated with [~Jianyun Zhao] , this may be a bug. 
>  
> {code:java}
> 2022-03-08 21:39:13,622 - INFO - 
> [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar 
> Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) 
> (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on 
> 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 
> (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics 
> for routing topic by message key hash.at 
> org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at
>  
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at
>  
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at
>  
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at
>  
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at
>  
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at
>  
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at
>  
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at
>  
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at
>  
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at
>  org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at 
> java.lang.Thread.run(Thread.java:748) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26642) Pulsar sink fails with non-partitioned topic

2022-03-16 Thread Yufan Sheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17507956#comment-17507956
 ] 

Yufan Sheng commented on FLINK-26642:
-

[~goldenyang] Tks for your PR. It's awesome and fixes the non-partitioned topic 
support issue.

> Pulsar sink fails with non-partitioned topic
> 
>
> Key: FLINK-26642
> URL: https://issues.apache.org/jira/browse/FLINK-26642
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: goldenyang
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Flink support pulsar sink now in 
> [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I 
> encountered a problem when using pulsar sink in master branch, when I use 
> non-partitioned topic.
> The current test found that both partitioned topics and non-partitioned 
> topics ending with -partition-i can be supported, but ordinary 
> non-partitioned topics without -partition-i will have problems, such as 
> 'test_topic'. 
> Reproducing the problem requires writing to a non-partitioned topic. Below is 
> the stack information when the exception is encountered. I briefly 
> communicated with [~Jianyun Zhao] , this may be a bug. 
>  
> {code:java}
> 2022-03-08 21:39:13,622 - INFO - 
> [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar 
> Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) 
> (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on 
> 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 
> (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics 
> for routing topic by message key hash.at 
> org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at
>  
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at
>  
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at
>  
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at
>  
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at
>  
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at
>  
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at
>  
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at
>  
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at
>  
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at
>  org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at 
> java.lang.Thread.run(Thread.java:748) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26642) Pulsar sink fails with non-partitioned topic

2022-03-15 Thread goldenyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17506782#comment-17506782
 ] 

goldenyang commented on FLINK-26642:


[~affe] Thanks. Please take a look, we tried to use this Pulsar Sink, and we 
fixed a version after encountering this problem. Please see if this 
modification is acceptable.

> Pulsar sink fails with non-partitioned topic
> 
>
> Key: FLINK-26642
> URL: https://issues.apache.org/jira/browse/FLINK-26642
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: goldenyang
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Flink support pulsar sink now in 
> [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I 
> encountered a problem when using pulsar sink in master branch, when I use 
> non-partitioned topic.
> The current test found that both partitioned topics and non-partitioned 
> topics ending with -partition-i can be supported, but ordinary 
> non-partitioned topics without -partition-i will have problems, such as 
> 'test_topic'. 
> Reproducing the problem requires writing to a non-partitioned topic. Below is 
> the stack information when the exception is encountered. I briefly 
> communicated with [~Jianyun Zhao] , this may be a bug. 
>  
> {code:java}
> 2022-03-08 21:39:13,622 - INFO - 
> [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar 
> Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) 
> (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on 
> 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 
> (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics 
> for routing topic by message key hash.at 
> org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at
>  
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at
>  
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at
>  
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at
>  
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at
>  
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at
>  
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at
>  
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at
>  
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at
>  
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at
>  org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at 
> java.lang.Thread.run(Thread.java:748) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26642) Pulsar sink fails with non-partitioned topic

2022-03-15 Thread Yufei Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17506717#comment-17506717
 ] 

Yufei Zhang commented on FLINK-26642:
-

Roger. I'll try to reproduce it. 

> Pulsar sink fails with non-partitioned topic
> 
>
> Key: FLINK-26642
> URL: https://issues.apache.org/jira/browse/FLINK-26642
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: goldenyang
>Priority: Major
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Flink support pulsar sink now in 
> [FLINK-20732|https://issues.apache.org/jira/browse/FLINK-20732]. I 
> encountered a problem when using pulsar sink in master branch, when I use 
> non-partitioned topic.
> The current test found that both partitioned topics and non-partitioned 
> topics ending with -partition-i can be supported, but ordinary 
> non-partitioned topics without -partition-i will have problems, such as 
> 'test_topic'. 
> Reproducing the problem requires writing to a non-partitioned topic. Below is 
> the stack information when the exception is encountered. I briefly 
> communicated with [~Jianyun Zhao] , this may be a bug. 
>  
> {code:java}
> 2022-03-08 21:39:13,622 - INFO - 
> [flink-akka.actor.default-dispatcher-13:Execution@1419] - Source: Pulsar 
> Source -> (Sink: Writer -> Sink: Committer, Sink: Print to Std. Out) (1/6) 
> (44af5e8a2b9d553952c7ed3e5d40e672) switched from RUNNING to FAILED on 
> 54284e57-42a9-4e2e-9c41-54b0ad559832 @ 127.0.0.1 
> (dataPort=-1).java.lang.IllegalArgumentException: You should provide topics 
> for routing topic by message key hash.at 
> org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument(Preconditions.java:144)at
>  
> org.apache.flink.connector.pulsar.sink.writer.router.RoundRobinTopicRouter.route(RoundRobinTopicRouter.java:54)at
>  
> org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:138)at
>  
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:124)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)at
>  
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)at
>  
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)at
>  
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:205)at
>  
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:41)at
>  
> org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:33)at
>  
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)at
>  
> org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:106)at
>  
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:382)at
>  
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)at
>  
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)at
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)at
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)at
>  
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)at
>  org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)at 
> java.lang.Thread.run(Thread.java:748) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)