[ 
https://issues.apache.org/jira/browse/FLINK-38023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17987239#comment-17987239
 ] 

Keshav Kansal commented on FLINK-38023:
---------------------------------------

Hi [~gaborgsomogyi]
This can be reproduced using the following topology

{code:java}
    source
        .keyBy(record -> "global") // Use a global key to process all records 
together
        .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
        .reduce(
            (a, b) -> {
              int countA = Integer.parseInt(a.get("count").toString());
              int countB = Integer.parseInt(b.get("count").toString());
              LOGGER.info("Processing values: {} and {}", countA, countB);
              if (countA > countB) {
                return a;
              }
              return b;
            })
        .returns(new GenericRecordAvroTypeInfo(schema))
        .print();
{code}

If more details are required it can be referred from here - 
https://github.com/kansalk/flink-init/blob/main/src/main/java/com/example/App.java#L62
I don't have the details around when it worked before, but I have tried with 
Flink 1.17 but face the same issue, we can attempt to go back even further. 
But as you mentioned even I think it was never working before.
I did found [this flink 
article|https://flink.apache.org/2020/01/29/state-unlocked-interacting-with-state-in-apache-flink/#implementing-apache-avro-serialization-in-flink]
 where I see that the deserialization from a previous schema is handled but 
found the current code different from this.

> State Restoration fails when using GenericRecord with new Avro schema
> ---------------------------------------------------------------------
>
>                 Key: FLINK-38023
>                 URL: https://issues.apache.org/jira/browse/FLINK-38023
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Keshav Kansal
>            Priority: Major
>
> Hi Team,
> I have a Flink job which is storing a 
> [GenericRecord|https://github.com/apache/avro/blob/main/lang/java/avro/src/main/java/org/apache/avro/generic/GenericRecord.java]
>  in the ValueState and we use 
> [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java]
> I am observing an issue while upgrading the avro schema.
> I reproduced this locally - 
> For example - Initially the Schema set is 
> {code:java}
> {
>   "type" : "record",
>   "name" : "SimpleEvent",
>   "fields" : [ {
>     "name" : "count",
>     "type" : "int"
>   } ]
> }
> {code}
> A savepoint is taken with some entities in the state. 
> The app is restored from the savepoint from the previous state and the below 
> new schema(Note - This is backward comptabile)
> {code:java}
> {
>   "type" : "record",
>   "name" : "SimpleEvent",
>   "fields" : [ {
>     "name" : "count",
>     "type" : "int"
>   }, {
>     "name" : "strTest",
>     "type" : [ "null", "string" ],
>     "default" : null
>   } ]
> }
> {code}
> While restoring the data from savepoint we observe the following exception 
> {code:java}
> Caused by: org.apache.flink.util.StateMigrationException: Error while trying 
> to migrate RocksDB state.
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:194)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:848)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:772)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:684)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:904)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:891)
>       at 
> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
>       at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:361)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:361)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:517)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:238)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>       at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds 
> for length 1
>       at org.apache.avro.generic.GenericData$Record.get(GenericData.java:289)
>       at org.apache.avro.generic.GenericData.getField(GenericData.java:860)
>       at org.apache.avro.generic.GenericData.getField(GenericData.java:879)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:243)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>       at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:188)
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:192)
>       ... 22 more
> {code}
> Tested with Flink version 1.20.1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to