[ https://issues.apache.org/jira/browse/FLINK-27068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17517815#comment-17517815 ]
CaoYu commented on FLINK-27068: ------------------------------- It may be because the order in which the data is passed to the downstream operator is not fixed eg, the 2th operation return result: ('a', 3, 0), ('a', 3, 0), ('b', 5, 1), ('b', 5, 1) May the order in which the data stream is passed to the downstream operators is not passed in the order \{a a b b } It may be passed to the downstream operator in the order of \{b b a a} If so, the result of the 3th operator is no \{('a', 1), ('a', 1), ('b', 1), ('b', 1)} but \{('b', 1), ('b', 1), ('b', 1), ('b', 1)} So the end result is \{b b b b }instead of \{a a b b} Test cases need to be redesigned and are not affected by the data order. > test_keyed_min_and_max and test_keyed_min_by_and_max_by failed in py36,37 > ------------------------------------------------------------------------- > > Key: FLINK-27068 > URL: https://issues.apache.org/jira/browse/FLINK-27068 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.16.0 > Reporter: Huang Xingbo > Priority: Critical > Labels: test-stability > > {code:java} > 2022-04-05T06:34:52.3441929Z Apr 05 06:34:52 > =================================== FAILURES > =================================== > 2022-04-05T06:34:52.3442672Z Apr 05 06:34:52 _____________ > StreamingModeDataStreamTests.test_keyed_min_and_max ______________ > 2022-04-05T06:34:52.3447439Z Apr 05 06:34:52 > 2022-04-05T06:34:52.3448183Z Apr 05 06:34:52 self = > <pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests > testMethod=test_keyed_min_and_max> > 2022-04-05T06:34:52.3448809Z Apr 05 06:34:52 > 2022-04-05T06:34:52.3449290Z Apr 05 06:34:52 def > test_keyed_min_and_max(self): > 2022-04-05T06:34:52.3472285Z Apr 05 06:34:52 ds = > self.env.from_collection([('a', 3, 0), ('a', 1, 1), ('b', 5, 1), ('b', 3, 1)], > 2022-04-05T06:34:52.3473027Z Apr 05 06:34:52 > type_info=Types.ROW_NAMED( > 2022-04-05T06:34:52.3473569Z Apr 05 06:34:52 > ["v1", "v2", "v3"], > 2022-04-05T06:34:52.3474130Z Apr 05 06:34:52 > [Types.STRING(), Types.INT(), Types.INT()]) > 2022-04-05T06:34:52.3474677Z Apr 05 06:34:52 > ) > 2022-04-05T06:34:52.3475478Z Apr 05 06:34:52 # 1th operator min: > ('a', 3, 0), ('a', 1, 0), ('b', 5, 1), ('b', 3, 1) > 2022-04-05T06:34:52.3476325Z Apr 05 06:34:52 # 2th operator max: > ('a', 3, 0), ('a', 3, 0), ('b', 5, 1), ('b', 5, 1) > 2022-04-05T06:34:52.3477137Z Apr 05 06:34:52 # 3th operator max: > ('a', 1), ('a', 1), ('b', 1), ('b', 1) > 2022-04-05T06:34:52.3477893Z Apr 05 06:34:52 # 4th operator min: > ('a', 'a', 'b', 'b') > 2022-04-05T06:34:52.3478429Z Apr 05 06:34:52 ds.key_by(lambda x: > x[0]) \ > 2022-04-05T06:34:52.3478915Z Apr 05 06:34:52 .min("v2") \ > 2022-04-05T06:34:52.3479403Z Apr 05 06:34:52 .map(lambda x: > (x[0], x[1], x[2]), > 2022-04-05T06:34:52.3480001Z Apr 05 06:34:52 > output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.INT()])) \ > 2022-04-05T06:34:52.3481663Z Apr 05 06:34:52 .key_by(lambda x: > x[2]) \ > 2022-04-05T06:34:52.3482134Z Apr 05 06:34:52 .max(1) \ > 2022-04-05T06:34:52.3482587Z Apr 05 06:34:52 .map(lambda x: > (x[0], 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \ > 2022-04-05T06:34:52.3483270Z Apr 05 06:34:52 .key_by(lambda x: > x[1]) \ > 2022-04-05T06:34:52.3483606Z Apr 05 06:34:52 .max() \ > 2022-04-05T06:34:52.3484001Z Apr 05 06:34:52 .map(lambda x: x[0], > output_type=Types.STRING()) \ > 2022-04-05T06:34:52.3484425Z Apr 05 06:34:52 .key_by(lambda x: x) > \ > 2022-04-05T06:34:52.3484770Z Apr 05 06:34:52 .min() \ > 2022-04-05T06:34:52.3485123Z Apr 05 06:34:52 > .add_sink(self.test_sink) > 2022-04-05T06:34:52.3485446Z Apr 05 06:34:52 > 2022-04-05T06:34:52.3485822Z Apr 05 06:34:52 > self.env.execute("key_by_min_max_test_stream") > 2022-04-05T06:34:52.3486275Z Apr 05 06:34:52 results = > self.test_sink.get_results(False) > 2022-04-05T06:34:52.3486953Z Apr 05 06:34:52 expected = ['a', 'a', > 'b', 'b'] > 2022-04-05T06:34:52.3487388Z Apr 05 06:34:52 > > self.assert_equals_sorted(expected, results) > 2022-04-05T06:34:52.3487878Z Apr 05 06:34:52 > 2022-04-05T06:34:52.3488255Z Apr 05 06:34:52 > pyflink/datastream/tests/test_data_stream.py:1123: > 2022-04-05T06:34:52.3488735Z Apr 05 06:34:52 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2022-04-05T06:34:52.3489250Z Apr 05 06:34:52 > pyflink/datastream/tests/test_data_stream.py:56: in assert_equals_sorted > 2022-04-05T06:34:52.3489736Z Apr 05 06:34:52 self.assertEqual(expected, > actual) > 2022-04-05T06:34:52.3490406Z Apr 05 06:34:52 E AssertionError: Lists > differ: ['a', 'a', 'b', 'b'] != ['b', 'b', 'b', 'b'] > 2022-04-05T06:34:52.3490899Z Apr 05 06:34:52 E > 2022-04-05T06:34:52.3491242Z Apr 05 06:34:52 E First differing element 0: > 2022-04-05T06:34:52.3491718Z Apr 05 06:34:52 E 'a' > 2022-04-05T06:34:52.3492148Z Apr 05 06:34:52 E 'b' > 2022-04-05T06:34:52.3492450Z Apr 05 06:34:52 E > 2022-04-05T06:34:52.3492900Z Apr 05 06:34:52 E - ['a', 'a', 'b', 'b'] > 2022-04-05T06:34:52.3493646Z Apr 05 06:34:52 E + ['b', 'b', 'b', 'b'] > 2022-04-05T06:34:52.3494138Z Apr 05 06:34:52 __________ > StreamingModeDataStreamTests.test_keyed_min_by_and_max_by ___________ > 2022-04-05T06:34:52.3494576Z Apr 05 06:34:52 > 2022-04-05T06:34:52.3495077Z Apr 05 06:34:52 self = > <pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests > testMethod=test_keyed_min_by_and_max_by> > 2022-04-05T06:34:52.3495573Z Apr 05 06:34:52 > 2022-04-05T06:34:52.3495919Z Apr 05 06:34:52 def > test_keyed_min_by_and_max_by(self): > 2022-04-05T06:34:52.3496607Z Apr 05 06:34:52 ds = > self.env.from_collection([('a', 3, 0), ('a', 1, 1), ('b', 5, 0), ('b', 3, 1)], > 2022-04-05T06:34:52.3497104Z Apr 05 06:34:52 > type_info=Types.ROW_NAMED( > 2022-04-05T06:34:52.3497500Z Apr 05 06:34:52 > ["v1", "v2", "v3"], > 2022-04-05T06:34:52.3497928Z Apr 05 06:34:52 > [Types.STRING(), Types.INT(), Types.INT()]) > 2022-04-05T06:34:52.3498330Z Apr 05 06:34:52 > ) > 2022-04-05T06:34:52.3498937Z Apr 05 06:34:52 # 1th operator min_by: > ('a', 3, 0), ('a', 1, 1), ('b', 5, 0), ('b', 3, 1) > 2022-04-05T06:34:52.3499641Z Apr 05 06:34:52 # 2th operator max_by: > ('a', 3, 0), ('a', 3, 0), ('b', 5, 0), ('b', 5, 0) > 2022-04-05T06:34:52.3500343Z Apr 05 06:34:52 # 3th operator min_by: > ('a', 3, 0), ('a', 3, 0), ('a', 3, 0), ('a', 3, 0) > 2022-04-05T06:34:52.3500981Z Apr 05 06:34:52 # 4th operator max_by: > ('a', 'a', 'a', 'a') > 2022-04-05T06:34:52.3501400Z Apr 05 06:34:52 ds.key_by(lambda x: > x[0]) \ > 2022-04-05T06:34:52.3501765Z Apr 05 06:34:52 .min_by("v2") \ > 2022-04-05T06:34:52.3502126Z Apr 05 06:34:52 .map(lambda x: > (x[0], x[1], x[2]), > 2022-04-05T06:34:52.3502598Z Apr 05 06:34:52 > output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.INT()])) \ > 2022-04-05T06:34:52.3503064Z Apr 05 06:34:52 .key_by(lambda x: > x[2]) \ > 2022-04-05T06:34:52.3503421Z Apr 05 06:34:52 .max_by(1) \ > 2022-04-05T06:34:52.3503782Z Apr 05 06:34:52 .key_by(lambda x: > x[2]) \ > 2022-04-05T06:34:52.3504234Z Apr 05 06:34:52 .min_by() \ > 2022-04-05T06:34:52.3504633Z Apr 05 06:34:52 .map(lambda x: x[0], > output_type=Types.STRING()) \ > 2022-04-05T06:34:52.3505055Z Apr 05 06:34:52 .key_by(lambda x: x) > \ > 2022-04-05T06:34:52.3505406Z Apr 05 06:34:52 .max_by() \ > 2022-04-05T06:34:52.3505766Z Apr 05 06:34:52 > .add_sink(self.test_sink) > 2022-04-05T06:34:52.3506103Z Apr 05 06:34:52 > 2022-04-05T06:34:52.3506477Z Apr 05 06:34:52 > self.env.execute("key_by_min_by_max_by_test_stream") > 2022-04-05T06:34:52.3506946Z Apr 05 06:34:52 results = > self.test_sink.get_results(False) > 2022-04-05T06:34:52.3507533Z Apr 05 06:34:52 expected = ['a', 'a', > 'a', 'a'] > 2022-04-05T06:34:52.3507964Z Apr 05 06:34:52 > > self.assert_equals_sorted(expected, results) > 2022-04-05T06:34:52.3508332Z Apr 05 06:34:52 > 2022-04-05T06:34:52.3508702Z Apr 05 06:34:52 > pyflink/datastream/tests/test_data_stream.py:1151: > 2022-04-05T06:34:52.3509277Z Apr 05 06:34:52 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > 2022-04-05T06:34:52.3509791Z Apr 05 06:34:52 > pyflink/datastream/tests/test_data_stream.py:56: in assert_equals_sorted > 2022-04-05T06:34:52.3510278Z Apr 05 06:34:52 self.assertEqual(expected, > actual) > 2022-04-05T06:34:52.3510943Z Apr 05 06:34:52 E AssertionError: Lists > differ: ['a', 'a', 'a', 'a'] != ['b', 'b', 'b', 'b'] > 2022-04-05T06:34:52.3511358Z Apr 05 06:34:52 E > 2022-04-05T06:34:52.3511685Z Apr 05 06:34:52 E First differing element 0: > 2022-04-05T06:34:52.3512150Z Apr 05 06:34:52 E 'a' > 2022-04-05T06:34:52.3512578Z Apr 05 06:34:52 E 'b' > 2022-04-05T06:34:52.3512880Z Apr 05 06:34:52 E > 2022-04-05T06:34:52.3513342Z Apr 05 06:34:52 E - ['a', 'a', 'a', 'a'] > 2022-04-05T06:34:52.3513670Z Apr 05 06:34:52 E ? ^ ^ ^ ^ > 2022-04-05T06:34:52.3513970Z Apr 05 06:34:52 E > 2022-04-05T06:34:52.3514428Z Apr 05 06:34:52 E + ['b', 'b', 'b', 'b'] > 2022-04-05T06:34:52.3514777Z Apr 05 06:34:52 E ? ^ ^ ^ ^ > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=34241&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=6bb545dd-772d-5d8c-f258-f5085fba3295&l=26123 -- This message was sent by Atlassian Jira (v8.20.1#820001)