Hi,

+1 for `InitContext#createInputSerializer()` .

I think we can get the serializer directly in InitContextImpl through
`getOperatorConfig().getTypeSerializerIn(0,
getUserCodeClassloader()).duplicate()`.

Best,
Lijie.

João Boto <eskabe...@apache.org> 于2023年4月24日周一 19:04写道:

> Hi @Gordon,
>
> `InitContext#createInputSerializer()` is a great option and will solve
> more than one problem, but I cant find a way to get the TypeInformation<T>
> on InitContextImpl (I can be missing something)
>
> On current (legacy) implementations we rely on interface ´
> InputTypeConfigurable´ to get the TypeInformation but this will not work
> for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink)
> As a side note, the ExecutionConfig provided by this interface could not
> be used as can be changed after the call is made, for Table Planning for
> example on DefaultExecutor.configureBatchSpecificProperties()
>
> At the end what we need to do is something like:
> if (isObjectReuseEnabled()) serializer.copy(record) else record;
>
> So responding to your question, yes last option is ok for this but I dont
> see how to implementing it as Im missing the TypeInformation on
> InitContextImpl.
>
> Best regards,
>
> On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote:
> > Do we have to introduce
> `InitContext#createSerializer(TypeInformation<T>)`
> > which returns TypeSerializer<T>, or is it sufficient to only provide
> > `InitContext#createInputSerializer()` which returns TypeSerializer<IN>?
> >
> > I had the impression that buffering sinks like JDBC only need the
> > latter. @Joao, could you confirm?
> >
> > If that's the case, +1 to adding the following method signatures to
> > InitContext:
> > * TypeSerializer<IN> createInputSerializer()
> > * boolean isObjectReuseEnabled()
> >
> > Thanks,
> > Gordon
> >
> > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu <reed...@gmail.com> wrote:
> >
> > > Good point! @Gordon
> > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> > > better option to me, so we do not need to introduce an unmodifiable
> > > `ExecutionConfig` at this moment.
> > >
> > > Hope that we can make `ExecutionConfig` a read-only interface in
> > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> > > while mutating the values at runtime is actually an undefined behavior.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> 于2023年4月18日周二 01:02写道:
> > > >
> > > > Hi,
> > > >
> > > > Sorry for chiming in late.
> > > >
> > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > > > directly through Sink#InitContext is the right thing to do.
> > > >
> > > > 1. A lot of the read-only getter methods on ExecutionConfig are
> > > irrelevant
> > > > for sinks. Expanding the scope of the InitContext interface with so
> many
> > > > irrelevant methods is probably going to make writing unit tests a
> pain.
> > > >
> > > > 2. There's actually a few getter methods on `InitContext` that have
> > > > duplicate/redundant info for what ExecutionConfig exposes. For
> example,
> > > > InitContext#getNumberOfParallelSubtasks and
> InitContext#getAttemptNumber
> > > > currently exist and it can be confusing if users find 2 sources of
> that
> > > > information (either via the `InitContext` and via the wrapped
> > > > `ExecutionConfig`).
> > > >
> > > > All in all, it feels like `Sink#InitContext` was introduced
> initially as
> > > a
> > > > means to selectively only expose certain information to sinks.
> > > >
> > > > It looks like right now, the only requirement is that some sinks
> require
> > > 1)
> > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type.
> Would it
> > > > make sense to follow the original intent and only selectively expose
> > > these?
> > > > For 1), we can just add a new method to `InitContext` and forward the
> > > > information from `ExecutionConfig` accessible at the operator level.
> > > > For 2), would it make sense to create the serializer at the operator
> > > level
> > > > and then provide it through `InitContext`?
> > > >
> > > > Thanks,
> > > > Gordon
> > > >
> > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu <reed...@gmail.com> wrote:
> > > >
> > > > > We can let the `InitContext` return `ExecutionConfig` in the
> interface.
> > > > > However, a `ReadableExecutionConfig` implementation should be
> returned
> > > > > so that exceptions will be thrown if users tries to modify the
> > > > > `ExecutionConfig`.
> > > > >
> > > > > We can rework all the setters of `ExecutionConfig` to internally
> > > invoke a
> > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig`
> can
> > > > > just override that method. But pay attention to a few exceptional
> > > > > setters, i.e. those for globalJobParameters and serializers.
> > > > >
> > > > > We should also explicitly state in the documentation of
> > > > > `InitContext #getExecutionConfig()`, that the returned
> > > `ExecutionConfig`
> > > > > is unmodifiable.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > João Boto <eskabe...@apache.org> 于2023年4月17日周一 16:51写道:
> > > > > >
> > > > > > Hi Zhu,
> > > > > >
> > > > > > Thanks for you time for reviewing this.
> > > > > >
> > > > > > Extending ´ExecutionConfig´ will allow to modify the values in
> the
> > > > > config (this is what we want to prevent with Option2)
> > > > > >
> > > > > > To extend the ExecutionConfig is not simpler to do Option1
> (expose
> > > > > ExecutionConfig directly).
> > > > > >
> > > > > > Regards
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > > > > > Hi João,
> > > > > > >
> > > > > > > Thanks for creating this FLIP!
> > > > > > > I'm overall +1 for it to unblock the migration of sinks to
> SinkV2.
> > > > > > >
> > > > > > > Yet I think it's better to let the `ReadableExecutionConfig`
> extend
> > > > > > > `ExecutionConfig`, because otherwise we have to introduce a new
> > > method
> > > > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`.
> The
> > > new
> > > > > > > method may require every `TypeInformation` to implement it,
> > > including
> > > > > > > Flink built-in ones and custom ones, otherwise exceptions will
> > > happen.
> > > > > > > That goal, however, is pretty hard to achieve.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Zhu
> > > > > > >
> > > > > > > João Boto <eskabe...@apache.org> 于2023年2月28日周二 23:34写道:
> > > > > > > >
> > > > > > > > I have update the FLIP with the 2 options that we have
> > > discussed..
> > > > > > > >
> > > > > > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > > > > > this have a minimal impact as we only have to expose the new
> > > methods
> > > > > > > >
> > > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > > > > > with this option we have more impact as we need to add a new
> > > method
> > > > > to TypeInformation and change all implementations (current exists
> 72
> > > > > implementations)
> > > > > > > >
> > > > > > > > Waiting for feedback or concerns about the two options
> > > > > > >
> > > > >
> > >
> >
>

Reply via email to