[ https://issues.apache.org/jira/browse/FLINK-22124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arvid Heise updated FLINK-22124: -------------------------------- Parent: (was: FLINK-22123) Issue Type: Bug (was: Sub-task) > The job finished without any exception if error was thrown during state access > ------------------------------------------------------------------------------ > > Key: FLINK-22124 > URL: https://issues.apache.org/jira/browse/FLINK-22124 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.13.0 > Reporter: Dian Fu > Assignee: Huang Xingbo > Priority: Major > Labels: pull-request-available > Fix For: 1.13.0, 1.12.3 > > > For the following job: > {code} > import logging > from pyflink.common import WatermarkStrategy, Row > from pyflink.common.serialization import Encoder > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.connectors import FileSink, OutputFileConfig, > NumberSequenceSource > from pyflink.datastream.execution_mode import RuntimeExecutionMode > from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext > from pyflink.datastream.state import MapStateDescriptor > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(2) > env.set_runtime_mode(RuntimeExecutionMode.BATCH) > seq_num_source = NumberSequenceSource(1, 1000) > file_sink = FileSink \ > > .for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state', > Encoder.simple_string_encoder()) \ > > .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) > \ > .build() > ds = env.from_source( > source=seq_num_source, > watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), > source_name='file_source', > type_info=Types.LONG()) > class MyKeyedProcessFunction(KeyedProcessFunction): > def __init__(self): > self.state = None > def open(self, runtime_context: RuntimeContext): > logging.info("open") > state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG()) > self.state = runtime_context.get_map_state(state_desc) > def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): > existing = self.state.get(value[0]) > if existing is None: > result = value[1] > self.state.put(value[0], result) > elif existing <= 10: > result = value[1] + existing > self.state.put(value[0], result) > else: > result = existing > yield result > ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), > Types.LONG()])) \ > .key_by(lambda a: a[0]) \ > .process(MyKeyedProcessFunction(), Types.LONG()) \ > .sink_to(file_sink) > env.execute('data_stream_batch_state') > {code} > As it will encounter KeyError in `self.state.get(value[0])` if value[0] > doesn't exist in the state, the job finished without any error message. This > issue should be addressed. We should make sure the error message appears in > the log file to help users to figure out what happens. -- This message was sent by Atlassian Jira (v8.3.4#803005)