[ https://issues.apache.org/jira/browse/FLINK-30637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Huang Xingbo resolved FLINK-30637. ---------------------------------- Fix Version/s: 1.16.1 1.15.4 Assignee: Xin Chen Resolution: Fixed Merged into master via d053867fb5c0fc647ea9266aab35598d7f3fc5c4 Merged into release-1.16 via eca940c5bf9e17c90dbb6f35e4ba370027137368 Merged into release-1.15 via 4035d61a2756ec16046fb687f533be0501fbbd35 > In linux-aarch64 environment, using “is” judgment to match the window type of > overwindow have returned incorrect matching results > --------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-30637 > URL: https://issues.apache.org/jira/browse/FLINK-30637 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.13.6 > Environment: Linux version 5.10.0-60.18.0.50.oe2203.aarch64 > (abuild@obs-worker-002) (gcc_old (GCC) 10.3.1, GNU ld (GNU Binutils) 2.37) #1 > SMP Wed Mar 30 02:43:08 UTC 2022 > > pyflink-version:1.13.6 > Reporter: Xin Chen > Assignee: Xin Chen > Priority: Major > Labels: pull-request-available > Fix For: 1.16.1, 1.15.4 > > > In linux-arch64 environment, “window_type is > OverWindow.ROW_UNBOUNDED_FOLLOWING” in in the > PandasBatchOverWindowAggregateFunctionOperation class of the pyflink source > code has returned the wrong result. > For example, when window_type is 6, it represents the window type of > ‘ROW_UNBOUNDED_FOLLOWING’, but “window_type is > OverWindow.ROW_UNBOUNDED_FOLLOWING” return false because the memory address > of window_type has changed. It will lead to the wrong type of window, such as > row sliding window, so, the wrong input data of python udf have been > assembled and wrong results of that have appeared. > > Specifically, the pyflink unit testcase is > ‘test_over_window_aggregate_function’ in > ‘pyflink\table\tests\test_pandas_udaf.py’. It performance incorrectly when I > execute it by pytest on linux-aarch64 system. I cut this unit use case to the > following code and executed it in the flink standalone mode of aarch64 > system, and got the same error result: > > {code:java} > import unittest > from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings > from pyflink.table.udf import udaf, AggregateFunction > class MaxAdd(AggregateFunction, unittest.TestCase): > def open(self, function_context): > mg = function_context.get_metric_group() > self.counter = mg.add_group("key", "value").counter("my_counter") > self.counter_sum = 0 > def get_value(self, accumulator): > # counter > self.counter.inc(10) > self.counter_sum += 10 > return accumulator[0] > def create_accumulator(self): > return [] > def accumulate(self, accumulator, *args): > result = 0 > for arg in args: > result += arg.max() > accumulator.append(result) > @udaf(result_type=DataTypes.FLOAT(), func_type="pandas") > def mean_udaf(v): > import logging > logging.error("debug") > logging.error(v) > return v.mean() > t_env = TableEnvironment.create( > > EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()) > t_env.get_config().get_configuration().set_string("parallelism.default", "2") > t_env.get_config().get_configuration().set_string( > "python.fn-execution.bundle.size", "1") > import datetime > t = t_env.from_elements( > [ > (1, 2, 3, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), > (1, 3, 1, datetime.datetime(2018, 3, 11, 3, 10, 0, 0)), > (1, 8, 5, datetime.datetime(2018, 3, 11, 4, 20, 0, 0)) > ], > DataTypes.ROW( > [DataTypes.FIELD("a", DataTypes.TINYINT()), > DataTypes.FIELD("b", DataTypes.SMALLINT()), > DataTypes.FIELD("c", DataTypes.INT()), > DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3))])) > # sink > t_env.execute_sql(""" > CREATE TABLE mySink ( > c INT, > d FLOAT > ) WITH ( > 'connector' = 'print' > ) > """) > t_env.create_temporary_system_function("mean_udaf", mean_udaf) > t_env.register_function("max_add", udaf(MaxAdd(), > result_type=DataTypes.INT(), > func_type="pandas")) > t_env.register_table("T", t) > t_env.execute_sql(""" > insert into mySink > select > max_add(b, c) > over (PARTITION BY a ORDER BY rowtime > ROWS BETWEEN UNBOUNDED preceding AND 0 FOLLOWING), > mean_udaf(b) > over (PARTITION BY a ORDER BY rowtime > ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) > from T > """).wait() > ''' > assert_equals(actual, > ["5,4.3333335", > "13,5.5", > "6,4.3333335"]) > '''{code} > The expected results are ["5,4.3333335", "13,5.5", "6,4.3333335"], but actual > results are List(5,2.0, 13,5.5, 4,2.5). For ‘mean_udaf’ and > ‘OverWindow.UNBOUNDED FOLLOWING’ in the code, by adding the error log, I > found that when window_type is 6 and 'OverWindow.ROW_UNBOUNDED_FOLLOWING' > also represents 6, the following code from pyflink source code returned false. > {code:java} > // pyflink\fn_execution\operations.py (line 273) > elif window_type is OverWindow.ROW_UNBOUNDED_FOLLOWING: > # row unbounded following window > window_start = window.lower_boundary > for j in range(input_cnt): > start = max(j + window_start, 0) > series_slices = [s.iloc[start: input_cnt] for s in input_series] > result.append(func(series_slices)){code} > And it It finally chose row sliding window to assemble input data of > mean_udaf: > {code:java} > // pyflink\fn_execution\operations.py (line 280) > else: > # row sliding window > window_start = window.lower_boundary > window_end = window.upper_boundary > for j in range(input_cnt): > start = max(j + window_start, 0) > end = min(j + window_end + 1, input_cnt) > series_slices = [s.iloc[start: end] for s in input_series] > result.append(func(series_slices)){code} > Obviously, that's not right. The right choice will be made in x86 environment. > The reason is window_ type‘s memory address is different from > ‘OverWindow.ROW_ UNBOUNDED_ FOLLOWING’ in linux-aarch64 environment. On the > contrary, they are the same in the linux-x86 environment. The reason why the > memory address is different is unknown yet. But I observed that window_type > comes from 'serialized_fn.windows': > {code:java} > def __init__(self, spec): > super(PandasBatchOverWindowAggregateFunctionOperation, self).__init__(spec) > self.windows = [window for window in self.spec.serialized_fn.windows] > {code} > Perhaps grpc, protobuf dependencies or serialization operations in the arrch > environment have affected the memory address of the int variables, I'll > explore the underlying reasons later. > > Solution and suggestion: > Since the window selections need to compare the values of two integer > variables(window_type, OverWindow.ROW_ UNBOUNDED_ FOLLOWING), I recommend > replacing ‘is’ with ‘==’ at the window type matching. That can also prevents > erroneous results caused by python small integer object pool failure which > may also affects the memory address. And this modification has been verified > to perform correctly on both x86 and aarch64 environments, either this unit > test case or the case I cut. > > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)