Hi, +1 for `InitContext#createInputSerializer()` .
I think we can get the serializer directly in InitContextImpl through `getOperatorConfig().getTypeSerializerIn(0, getUserCodeClassloader()).duplicate()`. Best, Lijie. João Boto <eskabe...@apache.org> 于2023年4月24日周一 19:04写道: > Hi @Gordon, > > `InitContext#createInputSerializer()` is a great option and will solve > more than one problem, but I cant find a way to get the TypeInformation<T> > on InitContextImpl (I can be missing something) > > On current (legacy) implementations we rely on interface ´ > InputTypeConfigurable´ to get the TypeInformation but this will not work > for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink) > As a side note, the ExecutionConfig provided by this interface could not > be used as can be changed after the call is made, for Table Planning for > example on DefaultExecutor.configureBatchSpecificProperties() > > At the end what we need to do is something like: > if (isObjectReuseEnabled()) serializer.copy(record) else record; > > So responding to your question, yes last option is ok for this but I dont > see how to implementing it as Im missing the TypeInformation on > InitContextImpl. > > Best regards, > > On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote: > > Do we have to introduce > `InitContext#createSerializer(TypeInformation<T>)` > > which returns TypeSerializer<T>, or is it sufficient to only provide > > `InitContext#createInputSerializer()` which returns TypeSerializer<IN>? > > > > I had the impression that buffering sinks like JDBC only need the > > latter. @Joao, could you confirm? > > > > If that's the case, +1 to adding the following method signatures to > > InitContext: > > * TypeSerializer<IN> createInputSerializer() > > * boolean isObjectReuseEnabled() > > > > Thanks, > > Gordon > > > > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu <reed...@gmail.com> wrote: > > > > > Good point! @Gordon > > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a > > > better option to me, so we do not need to introduce an unmodifiable > > > `ExecutionConfig` at this moment. > > > > > > Hope that we can make `ExecutionConfig` a read-only interface in > > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already, > > > while mutating the values at runtime is actually an undefined behavior. > > > > > > Thanks, > > > Zhu > > > > > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> 于2023年4月18日周二 01:02写道: > > > > > > > > Hi, > > > > > > > > Sorry for chiming in late. > > > > > > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig > > > > directly through Sink#InitContext is the right thing to do. > > > > > > > > 1. A lot of the read-only getter methods on ExecutionConfig are > > > irrelevant > > > > for sinks. Expanding the scope of the InitContext interface with so > many > > > > irrelevant methods is probably going to make writing unit tests a > pain. > > > > > > > > 2. There's actually a few getter methods on `InitContext` that have > > > > duplicate/redundant info for what ExecutionConfig exposes. For > example, > > > > InitContext#getNumberOfParallelSubtasks and > InitContext#getAttemptNumber > > > > currently exist and it can be confusing if users find 2 sources of > that > > > > information (either via the `InitContext` and via the wrapped > > > > `ExecutionConfig`). > > > > > > > > All in all, it feels like `Sink#InitContext` was introduced > initially as > > > a > > > > means to selectively only expose certain information to sinks. > > > > > > > > It looks like right now, the only requirement is that some sinks > require > > > 1) > > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. > Would it > > > > make sense to follow the original intent and only selectively expose > > > these? > > > > For 1), we can just add a new method to `InitContext` and forward the > > > > information from `ExecutionConfig` accessible at the operator level. > > > > For 2), would it make sense to create the serializer at the operator > > > level > > > > and then provide it through `InitContext`? > > > > > > > > Thanks, > > > > Gordon > > > > > > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu <reed...@gmail.com> wrote: > > > > > > > > > We can let the `InitContext` return `ExecutionConfig` in the > interface. > > > > > However, a `ReadableExecutionConfig` implementation should be > returned > > > > > so that exceptions will be thrown if users tries to modify the > > > > > `ExecutionConfig`. > > > > > > > > > > We can rework all the setters of `ExecutionConfig` to internally > > > invoke a > > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` > can > > > > > just override that method. But pay attention to a few exceptional > > > > > setters, i.e. those for globalJobParameters and serializers. > > > > > > > > > > We should also explicitly state in the documentation of > > > > > `InitContext #getExecutionConfig()`, that the returned > > > `ExecutionConfig` > > > > > is unmodifiable. > > > > > > > > > > Thanks, > > > > > Zhu > > > > > > > > > > João Boto <eskabe...@apache.org> 于2023年4月17日周一 16:51写道: > > > > > > > > > > > > Hi Zhu, > > > > > > > > > > > > Thanks for you time for reviewing this. > > > > > > > > > > > > Extending ´ExecutionConfig´ will allow to modify the values in > the > > > > > config (this is what we want to prevent with Option2) > > > > > > > > > > > > To extend the ExecutionConfig is not simpler to do Option1 > (expose > > > > > ExecutionConfig directly). > > > > > > > > > > > > Regards > > > > > > > > > > > > > > > > > > > > > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote: > > > > > > > Hi João, > > > > > > > > > > > > > > Thanks for creating this FLIP! > > > > > > > I'm overall +1 for it to unblock the migration of sinks to > SinkV2. > > > > > > > > > > > > > > Yet I think it's better to let the `ReadableExecutionConfig` > extend > > > > > > > `ExecutionConfig`, because otherwise we have to introduce a new > > > method > > > > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. > The > > > new > > > > > > > method may require every `TypeInformation` to implement it, > > > including > > > > > > > Flink built-in ones and custom ones, otherwise exceptions will > > > happen. > > > > > > > That goal, however, is pretty hard to achieve. > > > > > > > > > > > > > > Thanks, > > > > > > > Zhu > > > > > > > > > > > > > > João Boto <eskabe...@apache.org> 于2023年2月28日周二 23:34写道: > > > > > > > > > > > > > > > > I have update the FLIP with the 2 options that we have > > > discussed.. > > > > > > > > > > > > > > > > Option 1: Expose ExecutionConfig directly on InitContext > > > > > > > > this have a minimal impact as we only have to expose the new > > > methods > > > > > > > > > > > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext > > > > > > > > with this option we have more impact as we need to add a new > > > method > > > > > to TypeInformation and change all implementations (current exists > 72 > > > > > implementations) > > > > > > > > > > > > > > > > Waiting for feedback or concerns about the two options > > > > > > > > > > > > > > > > > >