Hey David,

this is a good catch! I've filed a JIRA ticket to address this in the docs
more prominently: https://issues.apache.org/jira/browse/FLINK-21073

Thanks a lot for reporting this issue!

On Thu, Jan 21, 2021 at 9:24 AM David Haglund <david.hagl...@niradynamics.se>
wrote:

> A colleague of mine found some hint under “Avro type” [2] in the State
> evolution schema page:
>
>
>
> *“**Example: RocksDB state backend relies on binary objects identity,
> rather than **hashCode** method implementation. Any changes to the keys
> object structure could lead to non deterministic behaviour.**”*
>
>
>
> I guess it is a known issue then, but it would at least to include that
> kind of fundamental information on the state backend page as well.
>
>
>
> Best regards,
>
> /David Haglund
>
>
>
> [2]
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#avro-types
>
>
>
>
>
> *From: *David Haglund <david.hagl...@niradynamics.se>
> *Date: *Wednesday, 20 January 2021 at 19:57
>
> I have an update. I have created a small project on github,
> https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which
> reproduces the issue.
>
>
>
> There seems to be problem with RocksDB in all versions I have tested (from
> 1.7.1 and later). In Flink 1.9.x only one of the events is counted with
> RockDB. In Flink 1.10.x and later all events are counted but with separate
> keys when all/both events should be counted using the same key.
>
>
>
> The main branch in my sample project is using Flink 1.11.3, then there are
> branches for Flink 1.9.1, 1.10.3 and 1.12.1.
>
>
>
> Best regards,
>
> /David Haglund
>
>
>
> *From: *David Haglund <david.hagl...@niradynamics.se>
> *Date: *Wednesday, 20 January 2021 at 09:38
>
> I have encountered a problem in Flink when trying to upgrade from Flink
> 1.9.1 to
>
> Flink 1.11.3.
>
>
>
> The problem in a combination of 2 components:
>
>
>
> * Keys implemented as case classes in Scala where we override the equals
> and
>
>   hashCode methods. The case class has additional fields which we are not
> used in
>
>   the keyBy (hashCode/equals) but can have different values for a
> specific key (the
>
>  fields we care about).
>
> * Checkpointing with RocksDB
>
>
>
> In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got
> aggregations
>
> for each unique key including the parameters which we did not want to
> include in
>
> the keyBy, which we exclicitly do not use in hashCode and equals. It
> looks likes
>
> hashCode is ignored in the keyBy in our case when we use RocksDB for
> checkpoints.
>
>
>
> We do not see this problem if we disable checkpointing or when using
>
> FsStateBackend.
>
>
>
> I have seen this with "Incremental Window Aggregation with
> AggregateFunction"
>
> [1], but a colleague of mine reported he had seen the same issue with
>
> KeyedProcessFunction too.
>
>
>
> We are using Scala version 2.11.12 and Java 8.
>
>
>
> This looks like a bug to me. Is it a known issue or a new one?
>
>
>
> Best regards,
>
> /David Haglund
>
>
>
> [1] Incremental Window Aggregation with AggregateFunction
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction
>
>
>
> *David Haglund*
> *Systems Engineer*
> Fleet Perception for Maintenance
>
> *NIRA Dynamics AB*
> Wallenbergs gata 4
> 58330 Linköping
> Sweden
>
> Mobile: +46 705 634 848
> david.hagl...@niradynamics.se
> www.niradynamics.se
>
> *Together for smarter safety*
>
>
>

Reply via email to