[ https://issues.apache.org/jira/browse/FLINK-14763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
richt richt updated FLINK-14763: -------------------------------- Description: when i commit a cep sql with sql-client use parallelism large than 1 , it print error as blow {code:java} //代码占位符 java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282) at org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at java.lang.Thread.run(Thread.java:748) {code} it seems allocate some key to the wrong taskmanager the yaml is {code:java} //代码占位符 - name: Ticker type: source-table update-mode: append connector: sink-partitioner: round-robin sink-partitioner-class: org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner property-version: 1 type: kafka version: universal topic: test_part startup-mode: earliest-offset properties: - key: zookeeper.connect value: 10.214.96.129:2181 - key: bootstrap.servers value: 172.24.136.97:9092,172.24.136.98:9092,172.24.136.99:9092 - key: group.id value: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: symbol type: VARCHAR - name: rtime type: TIMESTAMP rowtime: timestamps: type: from-field from: rowtime watermarks: type: periodic-bounded delay: 2000 - name: price type: BIGINT - name: tax type: BIGINT {code} was: when i commit a cep sql with sql-client use parallelism large than 1 , it print error as blow {code:java} //代码占位符 java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, endKeyGroup=15} does not contain key group 16 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216) at org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) at org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282) at org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at java.lang.Thread.run(Thread.java:748) {code} it seems allocate some key to the wrong taskmanager here the demo i use [链接标题|[https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/match_recognize.html]] > cep parallelism error > ----------------------- > > Key: FLINK-14763 > URL: https://issues.apache.org/jira/browse/FLINK-14763 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.10.0 > Environment: flink on yarn > flink 1.10 > hadoop 3.0 > kafka 2.2.0 > Reporter: richt richt > Priority: Major > > when i commit a cep sql with sql-client use parallelism large than 1 , it > print error as blow > {code:java} > //代码占位符 > java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, > endKeyGroup=15} does not contain key group > 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, > endKeyGroup=15} does not contain key group 16 at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216) > at > org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285) > at > org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282) > at > org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at > java.lang.Thread.run(Thread.java:748) > {code} > it seems allocate some key to the wrong taskmanager > > the yaml is > {code:java} > //代码占位符 > - name: Ticker > type: source-table > update-mode: append > connector: > sink-partitioner: round-robin > sink-partitioner-class: > org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner > property-version: 1 > type: kafka > version: universal > topic: test_part > startup-mode: earliest-offset > properties: > - key: zookeeper.connect > value: 10.214.96.129:2181 > - key: bootstrap.servers > value: 172.24.136.97:9092,172.24.136.98:9092,172.24.136.99:9092 > - key: group.id > value: testGroup > format: > property-version: 1 > type: json > derive-schema: true > schema: > - name: symbol > type: VARCHAR > - name: rtime > type: TIMESTAMP > rowtime: > timestamps: > type: from-field > from: rowtime > watermarks: > type: periodic-bounded > delay: 2000 > - name: price > type: BIGINT > - name: tax > type: BIGINT > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)