Dian Fu created FLINK-31272:
-------------------------------

             Summary: Duplicate operators appear in the StreamGraph for Python 
DataStream API jobs
                 Key: FLINK-31272
                 URL: https://issues.apache.org/jira/browse/FLINK-31272
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.16.0
            Reporter: Dian Fu


For the following job:
{code}
import argparse
import json
import sys
import time
from typing import Iterable, cast

from pyflink.common import Types, Time, Encoder
from pyflink.datastream import StreamExecutionEnvironment, 
ProcessWindowFunction, EmbeddedRocksDBStateBackend, \
    PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, 
ExternalizedCheckpointCleanup
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy, 
OutputFileConfig
from pyflink.datastream.state import ReducingState, ReducingStateDescriptor
from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, 
TumblingProcessingTimeWindows, \
    ProcessingTimeTrigger


class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger):

    def __init__(self, window_size: int):
        self._window_size = window_size
        self._count_state_descriptor = ReducingStateDescriptor(
            "count", lambda a, b: a + b, Types.LONG())

    @staticmethod
    def of(window_size: int) -> 'CountWithProcessTimeoutTrigger':
        return CountWithProcessTimeoutTrigger(window_size)

    def on_element(self,
                   element: T,
                   timestamp: int,
                   window: TimeWindow,
                   ctx: 'Trigger.TriggerContext') -> TriggerResult:
        count_state = cast(ReducingState, 
ctx.get_partitioned_state(self._count_state_descriptor))
        count_state.add(1)
        # print("element arrive:", element, "count_state:", count_state.get(), 
window.max_timestamp(),
        #       ctx.get_current_watermark())

        if count_state.get() >= self._window_size:  # 必须fire&purge!!!!
            print("fire element count", element, count_state.get(), 
window.max_timestamp(),
                  ctx.get_current_watermark())
            count_state.clear()
            return TriggerResult.FIRE_AND_PURGE
        if timestamp >= window.end:
            count_state.clear()
            return TriggerResult.FIRE_AND_PURGE
        else:
            return TriggerResult.CONTINUE

    def on_processing_time(self,
                           timestamp: int,
                           window: TimeWindow,
                           ctx: Trigger.TriggerContext) -> TriggerResult:
        if timestamp >= window.end:
            return TriggerResult.CONTINUE
        else:
            print("fire with process_time:", timestamp)
            count_state = cast(ReducingState, 
ctx.get_partitioned_state(self._count_state_descriptor))
            count_state.clear()
            return TriggerResult.FIRE_AND_PURGE

    def on_event_time(self,
                      timestamp: int,
                      window: TimeWindow,
                      ctx: 'Trigger.TriggerContext') -> TriggerResult:
        return TriggerResult.CONTINUE

    def clear(self,
              window: TimeWindow,
              ctx: 'Trigger.TriggerContext') -> None:
        count_state = ctx.get_partitioned_state(self._count_state_descriptor)
        count_state.clear()


def to_dict_map(v):
    time.sleep(1)
    dict_value = json.loads(v)
    return dict_value


def get_group_key(value, keys):
    group_key_values = []
    for key in keys:
        one_key_value = 'null'
        if key in value:
            list_value = value[key]
            if list_value:
                one_key_value = str(list_value[0])
        group_key_values.append(one_key_value)
    group_key = '_'.join(group_key_values)
    # print("group_key=", group_key)
    return group_key


class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, 
TimeWindow]):

    def __init__(self, uf):
        self._user_function = uf

    def process(self,
                key: str,
                context: ProcessWindowFunction.Context[TimeWindow],
                elements: Iterable[dict]) -> Iterable[dict]:
        result_list = 
self._user_function.process_after_group_by_function(elements)
        return result_list


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)
    output_path = known_args.output

    env = StreamExecutionEnvironment.get_execution_environment()
    # write all the data to one file
    env.set_parallelism(1)

    # process time
    env.get_config().set_auto_watermark_interval(0)
    state_backend = EmbeddedRocksDBStateBackend(True)
    state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED)
    env.set_state_backend(state_backend)
    config = env.get_checkpoint_config()
    # 
config.set_checkpoint_storage(FileSystemCheckpointStorage("hdfs://ha-nn-uri/tmp/checkpoint/"))
    
config.set_checkpoint_storage(FileSystemCheckpointStorage("file:///Users/10030122/Downloads/pyflink_checkpoint/"))
    config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
    config.set_checkpoint_interval(5000)
    
config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)

    # define the source
    data_stream1 = env.from_collection(['{"user_id": ["0"], "goods_id": [0,0]}',
                                        '{"user_id": ["1"], "goods_id": [1,0]}',
                                        '{"user_id": ["2"], "goods_id": [2,0]}',
                                        '{"user_id": ["1"], "goods_id": [3,0]}',
                                        '{"user_id": ["2"], "goods_id": [4,0]}',
                                        '{"user_id": ["1"], "goods_id": [5,0]}',
                                        '{"user_id": ["2"], "goods_id": [6,0]}',
                                        '{"user_id": ["1"], "goods_id": [7,0]}',
                                        '{"user_id": ["2"], "goods_id": [8,0]}',
                                        '{"user_id": ["1"], "goods_id": [9,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[10,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[11,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[12,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[13,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[14,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[15,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[16,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[17,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[18,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[19,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[20,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[21,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[22,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[23,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[24,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[25,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[26,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[27,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[28,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[29,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[30,0]}'])

    data_stream2 = env.from_collection(['{"user_id": ["0"], "goods_id": [0,0]}',
                                        '{"user_id": ["1"], "goods_id": [1,0]}',
                                        '{"user_id": ["2"], "goods_id": [2,0]}',
                                        '{"user_id": ["1"], "goods_id": [3,0]}',
                                        '{"user_id": ["2"], "goods_id": [4,0]}',
                                        '{"user_id": ["1"], "goods_id": [5,0]}',
                                        '{"user_id": ["2"], "goods_id": [6,0]}',
                                        '{"user_id": ["1"], "goods_id": [7,0]}',
                                        '{"user_id": ["2"], "goods_id": [8,0]}',
                                        '{"user_id": ["1"], "goods_id": [9,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[10,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[11,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[12,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[13,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[14,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[15,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[16,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[17,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[18,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[19,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[20,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[21,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[22,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[23,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[24,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[25,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[26,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[27,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[28,0]}',
                                        '{"user_id": ["1"], "goods_id": 
[29,0]}',
                                        '{"user_id": ["2"], "goods_id": 
[30,0]}'])

    # group_keys = ['user_id', 'goods_id']
    group_keys = ['user_id']

    sink_to_file_flag = True

    data_stream = data_stream1.union(data_stream2)

    # user_function = __import__("UserFunction")

    ds = data_stream.map(lambda v: to_dict_map(v)) \
        .filter(lambda v: v) \
        .map(lambda v: v) \
        .key_by(lambda v: get_group_key(v, group_keys)) \
        .window(TumblingProcessingTimeWindows.of(Time.seconds(12))) \
        .process(CountWindowProcessFunction(lambda v: v), Types.STRING())

    ds = ds.map(lambda v: v, Types.PRIMITIVE_ARRAY(Types.BYTE()))

    base_path = "/tmp/1.txt"
    encoder = Encoder.simple_string_encoder()
    file_sink_builder = FileSink.for_row_format(base_path, encoder)
    file_sink = file_sink_builder \
        .with_bucket_check_interval(1000) \
        .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
        .with_output_file_config(
        
OutputFileConfig.builder().with_part_prefix("pre").with_part_suffix("suf").build())
 \
        .build()
    ds.sink_to(file_sink)

    # submit for execution
    env.execute()
{code}

The stream graph is as following:
{code}
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Collection Source",
    "pact" : "Data Source",
    "contents" : "Source: Collection Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Source: Collection Source",
    "pact" : "Data Source",
    "contents" : "Source: Collection Source",
    "parallelism" : 1
  }, {
    "id" : 9,
    "type" : "TumblingProcessingTimeWindows",
    "pact" : "Operator",
    "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), 
ProcessingTimeTrigger, CountWindowProcessFunction)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 15,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 10,
    "type" : "Map",
    "pact" : "Operator",
    "contents" : "Map",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 9,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 15,
    "type" : "Map, Filter, Map, _stream_key_by_map_operator",
    "pact" : "Operator",
    "contents" : "Map, Filter, Map, _stream_key_by_map_operator",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    }, {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 16,
    "type" : "TumblingProcessingTimeWindows, Map",
    "pact" : "Operator",
    "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), 
ProcessingTimeTrigger, CountWindowProcessFunction)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 15,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 18,
    "type" : "Sink: Writer",
    "pact" : "Operator",
    "contents" : "Sink: Writer",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 10,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 20,
    "type" : "Sink: Committer",
    "pact" : "Operator",
    "contents" : "Sink: Committer",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 18,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
{code}

The plan is incorrect as we can see that TumblingProcessingTimeWindows appears 
twice.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to