Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3733#discussion_r112060500
--- Diff:
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
---
@@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView
target) throws IOException {
fieldSerializers[i].serialize(o, target);
}
}
+ commandSerializer.serialize(record.command, target);
--- End diff --
I'm afraid we cannot change the serialization of `Row`. `Row` is a public
class in `flink-core` and not an internal `flink-table` class. Hence, it is
used at other places and might also be part of user applications. If we change
the serialization, users might not be able to restore a job on 1.3 from a
savepoint taken with 1.2. This restriction rules out to simply add a field to
`Row` which would avoid major refactorings.
I see two options to add the command field to the data streams in
`flink-table`
1. use a regular field in `Row`. This would mean that the physical layout
of the `Row` is no longer the same as the logical layout, i.e., the one
expected by Calcite. However, we will probably change this anyway for the
upcoming changes related to the time indicators. For these, the physical layout
will have fewer fields than the logical layout (we will remove time fields
which are in the meta data of Flink's records or taken as processing time). By
adding the command field, we would add a field which is not in the logical
layout. The problem with this approach is that the command field would be at
different positions in the Row (probably the last one). We could leverage the
changes introduced by the time indicator changes (or the other way round).
@twalthr is working on this. You can have a look at the current status here:
https://github.com/twalthr/flink/tree/FLINK-5884
2. The other option is to wrap the rows in a custom data type similar to a
`Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and
would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator`
which forward most calls to the type info, serializer, and comparator of `Row`.
The problem with this approach is that we need to change the return types of
all functions. For some functions this might not be a big issue if we can take
the `Row` object before passing it to the code gen'd functions. The command
field could be set when the result Row is returned or in a wrapping
`Collector`.
My gut feeling is that the second approach is easier to implement because
we (hopefully) do not need to touch the generated code and "just" need to wrap
all `Row` objects in `CRow` objects.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---