[ 
https://issues.apache.org/jira/browse/FLINK-22124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise updated FLINK-22124:
--------------------------------
        Parent:     (was: FLINK-22123)
    Issue Type: Bug  (was: Sub-task)

> The job finished without any exception if error was thrown during state access
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-22124
>                 URL: https://issues.apache.org/jira/browse/FLINK-22124
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.13.0
>            Reporter: Dian Fu
>            Assignee: Huang Xingbo
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.12.3
>
>
> For the following job:
> {code}
> import logging
> from pyflink.common import WatermarkStrategy, Row
> from pyflink.common.serialization import Encoder
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import FileSink, OutputFileConfig, 
> NumberSequenceSource
> from pyflink.datastream.execution_mode import RuntimeExecutionMode
> from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
> from pyflink.datastream.state import MapStateDescriptor
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(2)
> env.set_runtime_mode(RuntimeExecutionMode.BATCH)
> seq_num_source = NumberSequenceSource(1, 1000)
> file_sink = FileSink \
>     
> .for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
>                     Encoder.simple_string_encoder()) \
>     
> .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())
>  \
>     .build()
> ds = env.from_source(
>     source=seq_num_source,
>     watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
>     source_name='file_source',
>     type_info=Types.LONG())
> class MyKeyedProcessFunction(KeyedProcessFunction):
>     def __init__(self):
>         self.state = None
>     def open(self, runtime_context: RuntimeContext):
>         logging.info("open")
>         state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
>         self.state = runtime_context.get_map_state(state_desc)
>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>         existing = self.state.get(value[0])
>         if existing is None:
>             result = value[1]
>             self.state.put(value[0], result)
>         elif existing <= 10:
>             result = value[1] + existing
>             self.state.put(value[0], result)
>         else:
>             result = existing
>         yield result
> ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), 
> Types.LONG()])) \
>     .key_by(lambda a: a[0]) \
>     .process(MyKeyedProcessFunction(), Types.LONG()) \
>     .sink_to(file_sink)
> env.execute('data_stream_batch_state')
> {code}
> As it will encounter KeyError in `self.state.get(value[0])` if value[0] 
> doesn't exist in the state, the job finished without any error message. This 
> issue should be addressed. We should make sure the error message appears in 
> the log file to help users to figure out what happens.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to