Dian Fu created FLINK-27223: ------------------------------- Summary: State access doesn't work as expected when cache size is set to 0 Key: FLINK-27223 URL: https://issues.apache.org/jira/browse/FLINK-27223 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.15.1
For the following job: {code} import json import logging import sys from pyflink.common import Types, Configuration from pyflink.datastream import StreamExecutionEnvironment from pyflink.util.java_utils import get_j_env_configuration if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") env = StreamExecutionEnvironment.get_execution_environment() config = Configuration( j_configuration=get_j_env_configuration(env._j_stream_execution_environment)) config.set_integer("python.state.cache-size", 0) env.set_parallelism(1) # define the source ds = env.from_collection( collection=[ (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'), (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'), (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'), (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}') ], type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()]) ) # key by ds = ds.map(lambda data: (json.loads(data.info)['addr']['country'], json.loads(data.info)['tel'])) \ .key_by(lambda data: data[0]).sum(1) ds.print() env.execute() {code} The expected result should be: {code} ('Germany', 123) ('China', 135) ('USA', 124) ('China', 167) {code} However, the actual result is: {code} ('Germany', 123) ('China', 135) ('USA', 124) ('China', 32) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)