Dian Fu created FLINK-31272:
-------------------------------
Summary: Duplicate operators appear in the StreamGraph for Python
DataStream API jobs
Key: FLINK-31272
URL: https://issues.apache.org/jira/browse/FLINK-31272
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.16.0
Reporter: Dian Fu
For the following job:
{code}
import argparse
import json
import sys
import time
from typing import Iterable, cast
from pyflink.common import Types, Time, Encoder
from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction, EmbeddedRocksDBStateBackend, \
PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode,
ExternalizedCheckpointCleanup
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy,
OutputFileConfig
from pyflink.datastream.state import ReducingState, ReducingStateDescriptor
from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T,
TumblingProcessingTimeWindows, \
ProcessingTimeTrigger
class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger):
def __init__(self, window_size: int):
self._window_size = window_size
self._count_state_descriptor = ReducingStateDescriptor(
"count", lambda a, b: a + b, Types.LONG())
@staticmethod
def of(window_size: int) -> 'CountWithProcessTimeoutTrigger':
return CountWithProcessTimeoutTrigger(window_size)
def on_element(self,
element: T,
timestamp: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
count_state = cast(ReducingState,
ctx.get_partitioned_state(self._count_state_descriptor))
count_state.add(1)
# print("element arrive:", element, "count_state:", count_state.get(),
window.max_timestamp(),
# ctx.get_current_watermark())
if count_state.get() >= self._window_size: # 必须fire&purge!!!!
print("fire element count", element, count_state.get(),
window.max_timestamp(),
ctx.get_current_watermark())
count_state.clear()
return TriggerResult.FIRE_AND_PURGE
if timestamp >= window.end:
count_state.clear()
return TriggerResult.FIRE_AND_PURGE
else:
return TriggerResult.CONTINUE
def on_processing_time(self,
timestamp: int,
window: TimeWindow,
ctx: Trigger.TriggerContext) -> TriggerResult:
if timestamp >= window.end:
return TriggerResult.CONTINUE
else:
print("fire with process_time:", timestamp)
count_state = cast(ReducingState,
ctx.get_partitioned_state(self._count_state_descriptor))
count_state.clear()
return TriggerResult.FIRE_AND_PURGE
def on_event_time(self,
timestamp: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE
def clear(self,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> None:
count_state = ctx.get_partitioned_state(self._count_state_descriptor)
count_state.clear()
def to_dict_map(v):
time.sleep(1)
dict_value = json.loads(v)
return dict_value
def get_group_key(value, keys):
group_key_values = []
for key in keys:
one_key_value = 'null'
if key in value:
list_value = value[key]
if list_value:
one_key_value = str(list_value[0])
group_key_values.append(one_key_value)
group_key = '_'.join(group_key_values)
# print("group_key=", group_key)
return group_key
class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str,
TimeWindow]):
def __init__(self, uf):
self._user_function = uf
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[dict]) -> Iterable[dict]:
result_list =
self._user_function.process_after_group_by_function(elements)
return result_list
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output
env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)
# process time
env.get_config().set_auto_watermark_interval(0)
state_backend = EmbeddedRocksDBStateBackend(True)
state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED)
env.set_state_backend(state_backend)
config = env.get_checkpoint_config()
#
config.set_checkpoint_storage(FileSystemCheckpointStorage("hdfs://ha-nn-uri/tmp/checkpoint/"))
config.set_checkpoint_storage(FileSystemCheckpointStorage("file:///Users/10030122/Downloads/pyflink_checkpoint/"))
config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
config.set_checkpoint_interval(5000)
config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
# define the source
data_stream1 = env.from_collection(['{"user_id": ["0"], "goods_id": [0,0]}',
'{"user_id": ["1"], "goods_id": [1,0]}',
'{"user_id": ["2"], "goods_id": [2,0]}',
'{"user_id": ["1"], "goods_id": [3,0]}',
'{"user_id": ["2"], "goods_id": [4,0]}',
'{"user_id": ["1"], "goods_id": [5,0]}',
'{"user_id": ["2"], "goods_id": [6,0]}',
'{"user_id": ["1"], "goods_id": [7,0]}',
'{"user_id": ["2"], "goods_id": [8,0]}',
'{"user_id": ["1"], "goods_id": [9,0]}',
'{"user_id": ["2"], "goods_id":
[10,0]}',
'{"user_id": ["1"], "goods_id":
[11,0]}',
'{"user_id": ["2"], "goods_id":
[12,0]}',
'{"user_id": ["1"], "goods_id":
[13,0]}',
'{"user_id": ["2"], "goods_id":
[14,0]}',
'{"user_id": ["1"], "goods_id":
[15,0]}',
'{"user_id": ["2"], "goods_id":
[16,0]}',
'{"user_id": ["1"], "goods_id":
[17,0]}',
'{"user_id": ["2"], "goods_id":
[18,0]}',
'{"user_id": ["1"], "goods_id":
[19,0]}',
'{"user_id": ["2"], "goods_id":
[20,0]}',
'{"user_id": ["1"], "goods_id":
[21,0]}',
'{"user_id": ["2"], "goods_id":
[22,0]}',
'{"user_id": ["1"], "goods_id":
[23,0]}',
'{"user_id": ["2"], "goods_id":
[24,0]}',
'{"user_id": ["1"], "goods_id":
[25,0]}',
'{"user_id": ["2"], "goods_id":
[26,0]}',
'{"user_id": ["1"], "goods_id":
[27,0]}',
'{"user_id": ["2"], "goods_id":
[28,0]}',
'{"user_id": ["1"], "goods_id":
[29,0]}',
'{"user_id": ["2"], "goods_id":
[30,0]}'])
data_stream2 = env.from_collection(['{"user_id": ["0"], "goods_id": [0,0]}',
'{"user_id": ["1"], "goods_id": [1,0]}',
'{"user_id": ["2"], "goods_id": [2,0]}',
'{"user_id": ["1"], "goods_id": [3,0]}',
'{"user_id": ["2"], "goods_id": [4,0]}',
'{"user_id": ["1"], "goods_id": [5,0]}',
'{"user_id": ["2"], "goods_id": [6,0]}',
'{"user_id": ["1"], "goods_id": [7,0]}',
'{"user_id": ["2"], "goods_id": [8,0]}',
'{"user_id": ["1"], "goods_id": [9,0]}',
'{"user_id": ["2"], "goods_id":
[10,0]}',
'{"user_id": ["1"], "goods_id":
[11,0]}',
'{"user_id": ["2"], "goods_id":
[12,0]}',
'{"user_id": ["1"], "goods_id":
[13,0]}',
'{"user_id": ["2"], "goods_id":
[14,0]}',
'{"user_id": ["1"], "goods_id":
[15,0]}',
'{"user_id": ["2"], "goods_id":
[16,0]}',
'{"user_id": ["1"], "goods_id":
[17,0]}',
'{"user_id": ["2"], "goods_id":
[18,0]}',
'{"user_id": ["1"], "goods_id":
[19,0]}',
'{"user_id": ["2"], "goods_id":
[20,0]}',
'{"user_id": ["1"], "goods_id":
[21,0]}',
'{"user_id": ["2"], "goods_id":
[22,0]}',
'{"user_id": ["1"], "goods_id":
[23,0]}',
'{"user_id": ["2"], "goods_id":
[24,0]}',
'{"user_id": ["1"], "goods_id":
[25,0]}',
'{"user_id": ["2"], "goods_id":
[26,0]}',
'{"user_id": ["1"], "goods_id":
[27,0]}',
'{"user_id": ["2"], "goods_id":
[28,0]}',
'{"user_id": ["1"], "goods_id":
[29,0]}',
'{"user_id": ["2"], "goods_id":
[30,0]}'])
# group_keys = ['user_id', 'goods_id']
group_keys = ['user_id']
sink_to_file_flag = True
data_stream = data_stream1.union(data_stream2)
# user_function = __import__("UserFunction")
ds = data_stream.map(lambda v: to_dict_map(v)) \
.filter(lambda v: v) \
.map(lambda v: v) \
.key_by(lambda v: get_group_key(v, group_keys)) \
.window(TumblingProcessingTimeWindows.of(Time.seconds(12))) \
.process(CountWindowProcessFunction(lambda v: v), Types.STRING())
ds = ds.map(lambda v: v, Types.PRIMITIVE_ARRAY(Types.BYTE()))
base_path = "/tmp/1.txt"
encoder = Encoder.simple_string_encoder()
file_sink_builder = FileSink.for_row_format(base_path, encoder)
file_sink = file_sink_builder \
.with_bucket_check_interval(1000) \
.with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
.with_output_file_config(
OutputFileConfig.builder().with_part_prefix("pre").with_part_suffix("suf").build())
\
.build()
ds.sink_to(file_sink)
# submit for execution
env.execute()
{code}
The stream graph is as following:
{code}
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Collection Source",
"pact" : "Data Source",
"contents" : "Source: Collection Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Source: Collection Source",
"pact" : "Data Source",
"contents" : "Source: Collection Source",
"parallelism" : 1
}, {
"id" : 9,
"type" : "TumblingProcessingTimeWindows",
"pact" : "Operator",
"contents" : "Window(TumblingProcessingTimeWindows(12000, 0),
ProcessingTimeTrigger, CountWindowProcessFunction)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 15,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 10,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 9,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 15,
"type" : "Map, Filter, Map, _stream_key_by_map_operator",
"pact" : "Operator",
"contents" : "Map, Filter, Map, _stream_key_by_map_operator",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
}, {
"id" : 2,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 16,
"type" : "TumblingProcessingTimeWindows, Map",
"pact" : "Operator",
"contents" : "Window(TumblingProcessingTimeWindows(12000, 0),
ProcessingTimeTrigger, CountWindowProcessFunction)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 15,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 18,
"type" : "Sink: Writer",
"pact" : "Operator",
"contents" : "Sink: Writer",
"parallelism" : 1,
"predecessors" : [ {
"id" : 10,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 20,
"type" : "Sink: Committer",
"pact" : "Operator",
"contents" : "Sink: Committer",
"parallelism" : 1,
"predecessors" : [ {
"id" : 18,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
{code}
The plan is incorrect as we can see that TumblingProcessingTimeWindows appears
twice.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)