Hi,

Are you using versions < 1.9.2? From the exception stack, it looks like
caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
Could you try it using 1.9.2?

Best,
Jark

On Mon, 20 Apr 2020 at 21:00, Kurt Young <ykt...@gmail.com> wrote:

> Can you reproduce this in a local program with mini-cluster?
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <zahidr1...@gmail.com> wrote:
>
>> You can read this for this type error.
>>
>>
>> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>>
>> I would suggest you set break points  in your code. Step through the
>> code, this  method should show you which array variable is being passed a
>> null argument when the array variable is not null able.
>>
>>
>>
>>
>> On Mon, 20 Apr 2020, 10:07 刘建刚, <liujiangangp...@gmail.com> wrote:
>>
>>>        I am using Roaring64NavigableMap to compute uv. It is ok to us
>>> flink planner and not ok with blink planner. The SQL is as following:
>>>
>>> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as 
>>> curTimestamp, A, B, C, D,
>>>         E, uv(bitmap(id)) as bmp
>>> FROM person
>>> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
>>>
>>>
>>>       The udf is as following:
>>>
>>> public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, 
>>> Roaring64NavigableMap> {
>>>    @Override
>>>    public Roaring64NavigableMap createAccumulator() {
>>>       return new Roaring64NavigableMap();
>>>    }
>>>
>>>    @Override
>>>    public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) 
>>> {
>>>       return accumulator;
>>>    }
>>>
>>>    public void accumulate(Roaring64NavigableMap bitmap, long id) {
>>>       bitmap.add(id);
>>>    }
>>> }
>>>
>>> public static class UV extends ScalarFunction {
>>>    public long eval(Roaring64NavigableMap bitmap) {
>>>       return bitmap.getLongCardinality();
>>>    }
>>> }
>>>
>>>       The error is as following:
>>>
>>> 2020-04-20 16:37:13,868 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>  [flink-akka.actor.default-dispatcher-40]  -
>>> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName,
>>> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)],
>>> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand,
>>> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5,
>>> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
>>> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS
>>> curTimestamp, brand, platform, channel, versionName, appMajorVersion,
>>> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink:
>>> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING
>>> to FAILED.
>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>   at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
>>>   at
>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
>>>   at
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
>>>   at
>>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
>>>   at
>>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
>>>   at
>>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>>>   at
>>> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
>>>   at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>>   at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>>   at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>>   at java.lang.Thread.run(Thread.java:745)
>>>
>>>       Do I need register Roaring64NavigableMap somewhere? Anyone can
>>> help me? Thank you.
>>>
>>>

Reply via email to