线上部署flink项目时,启动爆如下错误,在测试环境可正常启动,job依赖的flink版本是1.10,flink
集群版本是1.12-SNAPSHOT,与线上一致。有点摸不着头脑,麻烦各位老师帮分析分析
堆栈信息:
java.lang.IllegalArgumentException: key group from 44 to 45 does not contain
4
        at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
        at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:187)
        at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:182)
        at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:176)
        at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:112)
        at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:217)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:884)
        at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42)
        at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:898)
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:399)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:567)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)

代码逻辑大致:
        DataStream stream = dataStream
                .keyBy(keyBy(globalParallelism))
                .window(window(downsampling))
                .reduce(reduce(trackerType), processWindow(trackerType),
TypeInformation.of(Metrics.class))
                .keyBy(secondKeyBy())
               
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .reduce(reduce(trackerType),
processSecondWindow(trackerType), TypeInformation.of(Metrics.class))
                .rebalance()
                .addSink(sink())
                .setParallelism(globalParallelism/2);

    public KeySelector<Metrics, String> keyBy(int parallelism) {
        return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),ThreadLocalRandom.current().nextInt(parallelism));
    }

    public KeySelector<Metrics, String> secondKeyBy() {
        return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),
value.getWindowEnd());
    }
备注:二次keyby的原因是为了解决数据倾斜问题,第一个keyby用来基于EventTime的翻滚窗口,第二个keyby使用了基于processTime的session窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复