[
https://issues.apache.org/jira/browse/FLINK-35748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18087876#comment-18087876
]
Qilong Wang commented on FLINK-35748:
-------------------------------------
I don’t think this should be fixed by suppressing updates on tied rowtime only
across bundles.
For `keepLastRow`, the current implementation still treats tied rowtime as
“later input wins” inside a bundle. If the state comparison across bundles is
changed to strict rowtime ordering, the final materialized result can depend on
the mini-batch boundary.
For example, with inputs:
`(book,10,1L), (book,11,5L), (book,12,3L), (book,13,5L), (book,14,5L)`
With `bundle size = 4`, the first bundle emits `(book,13,5L)`. The later
`(book,14,5L)` has the same rowtime and is ignored by the strict cross-bundle
comparison, so the final result stays `(book,13,5L)`.
With `bundle size = 5`, all records are compacted in one bundle, and the tied
rowtime handling inside the bundle selects the later `(book,14,5L)`.
So this change makes the final result depend on bundle size. Either tied
rowtime should replace consistently both inside and across bundles, or it
should not replace consistently in both places. The current mixed behavior
seems worse than the original extra `-U/+U` changelog.
> DeduplicateITCase.testLastRowWithoutAllChangelogOnRowtime with MiniBatch mode
> and RocksDB backend enabled
> ---------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35748
> URL: https://issues.apache.org/jira/browse/FLINK-35748
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0
> Reporter: Matthias Pohl
> Priority: Major
> Labels: test-stability
> Attachments: failure.miniBatchMode.RocksDB.log,
> success.miniBatchMode.InMemory.log
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60613&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=12259
> {code}
> Jul 02 14:44:36 14:44:36.737 [ERROR] Tests run: 40, Failures: 1, Errors: 0,
> Skipped: 4, Time elapsed: 18.45 s <<< FAILURE! -- in
> org.apache.flink.table.planner.runtime.stream.sql.DeduplicateITCase
> Jul 02 14:44:36 14:44:36.737 [ERROR]
> org.apache.flink.table.planner.runtime.stream.sql.DeduplicateITCase.testLastRowWithoutAllChangelogOnRowtime
> -- Time elapsed: 0.860 s <<< FAILURE!
> Jul 02 14:44:36 org.opentest4j.AssertionFailedError:
> Jul 02 14:44:36
> Jul 02 14:44:36 expected: List(+I(1,1,Hi,1970-01-01T00:00:00.001),
> +I(1,2,Hello world,1970-01-01T00:00:00.002), +I(2,3,I am
> fine.,1970-01-01T00:00:00.003), +I(2,6,Comment#1,1970-01-01T00:00:00.006),
> +I(3,5,Comment#2,1970-01-01T00:00:00.005),
> +I(4,4,Comment#3,1970-01-01T00:00:00.004))
> Jul 02 14:44:36 but was: ArrayBuffer(+I(1,1,Hi,1970-01-01T00:00:00.001),
> +I(1,2,Hello world,1970-01-01T00:00:00.002),
> +I(1,3,Hello,1970-01-01T00:00:00.003),
> +I(2,6,Comment#1,1970-01-01T00:00:00.006),
> +I(3,5,Comment#2,1970-01-01T00:00:00.005),
> +I(4,4,Comment#3,1970-01-01T00:00:00.004), +U(2,3,I am
> fine.,1970-01-01T00:00:00.003), -U(1,3,Hello,1970-01-01T00:00:00.003))
> Jul 02 14:44:36 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Jul 02 14:44:36 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Jul 02 14:44:36 at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> Jul 02 14:44:36 at
> org.apache.flink.table.planner.runtime.stream.sql.DeduplicateITCase.testLastRowWithoutAllChangelogOnRowtime(DeduplicateITCase.scala:364)
> Jul 02 14:44:36 at java.lang.reflect.Method.invoke(Method.java:498)
> [...]
> {code}
> The test failure appeared in a CI run for FLINK-35553 which changes how
> rescaling is triggered. I checked the logs and couldn't find any evidence
> that the test run includes job restarts and, therefore, wouldn't touch the
> FLINK-35553 changes (see attached logs).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)