Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-31 Thread João Boto
If there is no more questions, I will start the voting thread tomorrow

On 2023/01/13 14:15:04 Joao Boto wrote:
> Hi flink devs,
> 
> I'd like to start a discussion thread for FLIP-287[1].
> This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> specially for the sink[3].
> 
> Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> This  changes are necessary to correct migrate the current sinks to SinkV2
> like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> 
> Comments are welcome!
> Thanks,
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> [3] https://issues.apache.org/jira/browse/FLINK-25421
> 


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-30 Thread Tzu-Li (Gordon) Tai
Hi,

> I think we can get the serializer directly in InitContextImpl through
`getOperatorConfig().getTypeSerializerIn(0,
getUserCodeClassloader()).duplicate()`.

This should work, yes.

+1 to the updated FLIP so far. Thank you, Joao, for being on top of this!

Thanks,
Gordon

On Tue, May 30, 2023 at 12:34 AM João Boto  wrote:

> Hi Lijie,
>
> I left the two options to use whatever you want, but I can clean the FLIP
> to have only one..
>
> Updated the FLIP
>
> Regards
>
> On 2023/05/23 07:23:45 Lijie Wang wrote:
> > Hi Joao,
> >
> > I noticed the FLIP currently contains the following 2 methods about type
> > serializer:
> >
> > (1)  TypeSerializer createInputSerializer();
> > (2)  TypeSerializer createSerializer(TypeInformation inType);
> >
> > Is the method (2) still needed now?
> >
> > Best,
> > Lijie
> >
> > João Boto  于2023年5月19日周五 16:53写道:
> >
> > > Updated the FLIP to use this option.
> > >
> >
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-30 Thread João Boto
Hi Lijie,

I left the two options to use whatever you want, but I can clean the FLIP to 
have only one..

Updated the FLIP

Regards

On 2023/05/23 07:23:45 Lijie Wang wrote:
> Hi Joao,
> 
> I noticed the FLIP currently contains the following 2 methods about type
> serializer:
> 
> (1)  TypeSerializer createInputSerializer();
> (2)  TypeSerializer createSerializer(TypeInformation inType);
> 
> Is the method (2) still needed now?
> 
> Best,
> Lijie
> 
> João Boto  于2023年5月19日周五 16:53写道:
> 
> > Updated the FLIP to use this option.
> >
> 


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-23 Thread Lijie Wang
Hi Joao,

I noticed the FLIP currently contains the following 2 methods about type
serializer:

(1)  TypeSerializer createInputSerializer();
(2)  TypeSerializer createSerializer(TypeInformation inType);

Is the method (2) still needed now?

Best,
Lijie

João Boto  于2023年5月19日周五 16:53写道:

> Updated the FLIP to use this option.
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-19 Thread João Boto
Updated the FLIP to use this option.


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-17 Thread Lijie Wang
Hi,

+1 for `InitContext#createInputSerializer()` .

I think we can get the serializer directly in InitContextImpl through
`getOperatorConfig().getTypeSerializerIn(0,
getUserCodeClassloader()).duplicate()`.

Best,
Lijie.

João Boto  于2023年4月24日周一 19:04写道:

> Hi @Gordon,
>
> `InitContext#createInputSerializer()` is a great option and will solve
> more than one problem, but I cant find a way to get the TypeInformation
> on InitContextImpl (I can be missing something)
>
> On current (legacy) implementations we rely on interface ´
> InputTypeConfigurable´ to get the TypeInformation but this will not work
> for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink)
> As a side note, the ExecutionConfig provided by this interface could not
> be used as can be changed after the call is made, for Table Planning for
> example on DefaultExecutor.configureBatchSpecificProperties()
>
> At the end what we need to do is something like:
> if (isObjectReuseEnabled()) serializer.copy(record) else record;
>
> So responding to your question, yes last option is ok for this but I dont
> see how to implementing it as Im missing the TypeInformation on
> InitContextImpl.
>
> Best regards,
>
> On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote:
> > Do we have to introduce
> `InitContext#createSerializer(TypeInformation)`
> > which returns TypeSerializer, or is it sufficient to only provide
> > `InitContext#createInputSerializer()` which returns TypeSerializer?
> >
> > I had the impression that buffering sinks like JDBC only need the
> > latter. @Joao, could you confirm?
> >
> > If that's the case, +1 to adding the following method signatures to
> > InitContext:
> > * TypeSerializer createInputSerializer()
> > * boolean isObjectReuseEnabled()
> >
> > Thanks,
> > Gordon
> >
> > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu  wrote:
> >
> > > Good point! @Gordon
> > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> > > better option to me, so we do not need to introduce an unmodifiable
> > > `ExecutionConfig` at this moment.
> > >
> > > Hope that we can make `ExecutionConfig` a read-only interface in
> > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> > > while mutating the values at runtime is actually an undefined behavior.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Tzu-Li (Gordon) Tai  于2023年4月18日周二 01:02写道:
> > > >
> > > > Hi,
> > > >
> > > > Sorry for chiming in late.
> > > >
> > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > > > directly through Sink#InitContext is the right thing to do.
> > > >
> > > > 1. A lot of the read-only getter methods on ExecutionConfig are
> > > irrelevant
> > > > for sinks. Expanding the scope of the InitContext interface with so
> many
> > > > irrelevant methods is probably going to make writing unit tests a
> pain.
> > > >
> > > > 2. There's actually a few getter methods on `InitContext` that have
> > > > duplicate/redundant info for what ExecutionConfig exposes. For
> example,
> > > > InitContext#getNumberOfParallelSubtasks and
> InitContext#getAttemptNumber
> > > > currently exist and it can be confusing if users find 2 sources of
> that
> > > > information (either via the `InitContext` and via the wrapped
> > > > `ExecutionConfig`).
> > > >
> > > > All in all, it feels like `Sink#InitContext` was introduced
> initially as
> > > a
> > > > means to selectively only expose certain information to sinks.
> > > >
> > > > It looks like right now, the only requirement is that some sinks
> require
> > > 1)
> > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type.
> Would it
> > > > make sense to follow the original intent and only selectively expose
> > > these?
> > > > For 1), we can just add a new method to `InitContext` and forward the
> > > > information from `ExecutionConfig` accessible at the operator level.
> > > > For 2), would it make sense to create the serializer at the operator
> > > level
> > > > and then provide it through `InitContext`?
> > > >
> > > > Thanks,
> > > > Gordon
> > > >
> > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu  wrote:
> > > >
> > > > > We can let the `InitContext` return `ExecutionConfig` in the
> interface.
> > > > > However, a `ReadableExecutionConfig` implementation should be
> returned
> > > > > so that exceptions will be thrown if users tries to modify the
> > > > > `ExecutionConfig`.
> > > > >
> > > > > We can rework all the setters of `ExecutionConfig` to internally
> > > invoke a
> > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig`
> can
> > > > > just override that method. But pay attention to a few exceptional
> > > > > setters, i.e. those for globalJobParameters and serializers.
> > > > >
> > > > > We should also explicitly state in the documentation of
> > > > > `InitContext #getExecutionConfig()`, that the returned
> > > `ExecutionConfig`
> > > > > is unmodifiable.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > João B

RE: Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-01 Thread Raman Verma
Hello Joao Boto,

Can you please add Gordon’s suggested approach as a third option. Also, it will 
be great to list the pros and cons of each approach in the FLIP

Thanks,
Raman Verma

On 2023/04/24 11:04:26 João Boto wrote:
> Hi @Gordon,
> 
> `InitContext#createInputSerializer()` is a great option and will solve more 
> than one problem, but I cant find a way to get the TypeInformation on 
> InitContextImpl (I can be missing something)
> 
> On current (legacy) implementations we rely on interface ´ 
> InputTypeConfigurable´ to get the TypeInformation but this will not work for 
> Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink)
> As a side note, the ExecutionConfig provided by this interface could not be 
> used as can be changed after the call is made, for Table Planning for example 
> on DefaultExecutor.configureBatchSpecificProperties()
> 
> At the end what we need to do is something like:
> if (isObjectReuseEnabled()) serializer.copy(record) else record;
> 
> So responding to your question, yes last option is ok for this but I dont see 
> how to implementing it as Im missing the TypeInformation on InitContextImpl.
> 
> Best regards,
> 
> On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote:
> > Do we have to introduce `InitContext#createSerializer(TypeInformation)`
> > which returns TypeSerializer, or is it sufficient to only provide
> > `InitContext#createInputSerializer()` which returns TypeSerializer?
> > 
> > I had the impression that buffering sinks like JDBC only need the
> > latter. @Joao, could you confirm?
> > 
> > If that's the case, +1 to adding the following method signatures to
> > InitContext:
> > * TypeSerializer createInputSerializer()
> > * boolean isObjectReuseEnabled()
> > 
> > Thanks,
> > Gordon
> > 
> > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu  wrote:
> > 
> > > Good point! @Gordon
> > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> > > better option to me, so we do not need to introduce an unmodifiable
> > > `ExecutionConfig` at this moment.
> > >
> > > Hope that we can make `ExecutionConfig` a read-only interface in
> > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> > > while mutating the values at runtime is actually an undefined behavior.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Tzu-Li (Gordon) Tai  于2023年4月18日周二 01:02写道:
> > > >
> > > > Hi,
> > > >
> > > > Sorry for chiming in late.
> > > >
> > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > > > directly through Sink#InitContext is the right thing to do.
> > > >
> > > > 1. A lot of the read-only getter methods on ExecutionConfig are
> > > irrelevant
> > > > for sinks. Expanding the scope of the InitContext interface with so many
> > > > irrelevant methods is probably going to make writing unit tests a pain.
> > > >
> > > > 2. There's actually a few getter methods on `InitContext` that have
> > > > duplicate/redundant info for what ExecutionConfig exposes. For example,
> > > > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
> > > > currently exist and it can be confusing if users find 2 sources of that
> > > > information (either via the `InitContext` and via the wrapped
> > > > `ExecutionConfig`).
> > > >
> > > > All in all, it feels like `Sink#InitContext` was introduced initially as
> > > a
> > > > means to selectively only expose certain information to sinks.
> > > >
> > > > It looks like right now, the only requirement is that some sinks require
> > > 1)
> > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
> > > > make sense to follow the original intent and only selectively expose
> > > these?
> > > > For 1), we can just add a new method to `InitContext` and forward the
> > > > information from `ExecutionConfig` accessible at the operator level.
> > > > For 2), would it make sense to create the serializer at the operator
> > > level
> > > > and then provide it through `InitContext`?
> > > >
> > > > Thanks,
> > > > Gordon
> > > >
> > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu  wrote:
> > > >
> > > > > We can let the `InitContext` return `ExecutionConfig` in the 
> > > > > interface.
> > > > > However, a `ReadableExecutionConfig` implementation should be returned
> > > > > so that exceptions will be thrown if users tries to modify the
> > > > > `ExecutionConfig`.
> > > > >
> > > > > We can rework all the setters of `ExecutionConfig` to internally
> > > invoke a
> > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> > > > > just override that method. But pay attention to a few exceptional
> > > > > setters, i.e. those for globalJobParameters and serializers.
> > > > >
> > > > > We should also explicitly state in the documentation of
> > > > > `InitContext #getExecutionConfig()`, that the returned
> > > `ExecutionConfig`
> > > > > is unmodifiable.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > João Boto  于2023年4月17日周一 16:51写道:
>

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-04-24 Thread João Boto
Hi @Gordon,

`InitContext#createInputSerializer()` is a great option and will solve more 
than one problem, but I cant find a way to get the TypeInformation on 
InitContextImpl (I can be missing something)

On current (legacy) implementations we rely on interface ´ 
InputTypeConfigurable´ to get the TypeInformation but this will not work for 
Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink)
As a side note, the ExecutionConfig provided by this interface could not be 
used as can be changed after the call is made, for Table Planning for example 
on DefaultExecutor.configureBatchSpecificProperties()

At the end what we need to do is something like:
if (isObjectReuseEnabled()) serializer.copy(record) else record;

So responding to your question, yes last option is ok for this but I dont see 
how to implementing it as Im missing the TypeInformation on InitContextImpl.

Best regards,

On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote:
> Do we have to introduce `InitContext#createSerializer(TypeInformation)`
> which returns TypeSerializer, or is it sufficient to only provide
> `InitContext#createInputSerializer()` which returns TypeSerializer?
> 
> I had the impression that buffering sinks like JDBC only need the
> latter. @Joao, could you confirm?
> 
> If that's the case, +1 to adding the following method signatures to
> InitContext:
> * TypeSerializer createInputSerializer()
> * boolean isObjectReuseEnabled()
> 
> Thanks,
> Gordon
> 
> On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu  wrote:
> 
> > Good point! @Gordon
> > Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> > better option to me, so we do not need to introduce an unmodifiable
> > `ExecutionConfig` at this moment.
> >
> > Hope that we can make `ExecutionConfig` a read-only interface in
> > Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> > while mutating the values at runtime is actually an undefined behavior.
> >
> > Thanks,
> > Zhu
> >
> > Tzu-Li (Gordon) Tai  于2023年4月18日周二 01:02写道:
> > >
> > > Hi,
> > >
> > > Sorry for chiming in late.
> > >
> > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > > directly through Sink#InitContext is the right thing to do.
> > >
> > > 1. A lot of the read-only getter methods on ExecutionConfig are
> > irrelevant
> > > for sinks. Expanding the scope of the InitContext interface with so many
> > > irrelevant methods is probably going to make writing unit tests a pain.
> > >
> > > 2. There's actually a few getter methods on `InitContext` that have
> > > duplicate/redundant info for what ExecutionConfig exposes. For example,
> > > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
> > > currently exist and it can be confusing if users find 2 sources of that
> > > information (either via the `InitContext` and via the wrapped
> > > `ExecutionConfig`).
> > >
> > > All in all, it feels like `Sink#InitContext` was introduced initially as
> > a
> > > means to selectively only expose certain information to sinks.
> > >
> > > It looks like right now, the only requirement is that some sinks require
> > 1)
> > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
> > > make sense to follow the original intent and only selectively expose
> > these?
> > > For 1), we can just add a new method to `InitContext` and forward the
> > > information from `ExecutionConfig` accessible at the operator level.
> > > For 2), would it make sense to create the serializer at the operator
> > level
> > > and then provide it through `InitContext`?
> > >
> > > Thanks,
> > > Gordon
> > >
> > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu  wrote:
> > >
> > > > We can let the `InitContext` return `ExecutionConfig` in the interface.
> > > > However, a `ReadableExecutionConfig` implementation should be returned
> > > > so that exceptions will be thrown if users tries to modify the
> > > > `ExecutionConfig`.
> > > >
> > > > We can rework all the setters of `ExecutionConfig` to internally
> > invoke a
> > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> > > > just override that method. But pay attention to a few exceptional
> > > > setters, i.e. those for globalJobParameters and serializers.
> > > >
> > > > We should also explicitly state in the documentation of
> > > > `InitContext #getExecutionConfig()`, that the returned
> > `ExecutionConfig`
> > > > is unmodifiable.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > João Boto  于2023年4月17日周一 16:51写道:
> > > > >
> > > > > Hi Zhu,
> > > > >
> > > > > Thanks for you time for reviewing this.
> > > > >
> > > > > Extending ´ExecutionConfig´ will allow to modify the values in the
> > > > config (this is what we want to prevent with Option2)
> > > > >
> > > > > To extend the ExecutionConfig is not simpler to do Option1 (expose
> > > > ExecutionConfig directly).
> > > > >
> > > > > Regards
> > > > >
> > > > >
> > > > >
> > > > > On 2023/04/03 09:42:28 Zhu Zhu wro

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-04-21 Thread Tzu-Li (Gordon) Tai
Do we have to introduce `InitContext#createSerializer(TypeInformation)`
which returns TypeSerializer, or is it sufficient to only provide
`InitContext#createInputSerializer()` which returns TypeSerializer?

I had the impression that buffering sinks like JDBC only need the
latter. @Joao, could you confirm?

If that's the case, +1 to adding the following method signatures to
InitContext:
* TypeSerializer createInputSerializer()
* boolean isObjectReuseEnabled()

Thanks,
Gordon

On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu  wrote:

> Good point! @Gordon
> Introducing an `InitContext#createSerializer(TypeInformation)` looks a
> better option to me, so we do not need to introduce an unmodifiable
> `ExecutionConfig` at this moment.
>
> Hope that we can make `ExecutionConfig` a read-only interface in
> Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
> while mutating the values at runtime is actually an undefined behavior.
>
> Thanks,
> Zhu
>
> Tzu-Li (Gordon) Tai  于2023年4月18日周二 01:02写道:
> >
> > Hi,
> >
> > Sorry for chiming in late.
> >
> > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> > directly through Sink#InitContext is the right thing to do.
> >
> > 1. A lot of the read-only getter methods on ExecutionConfig are
> irrelevant
> > for sinks. Expanding the scope of the InitContext interface with so many
> > irrelevant methods is probably going to make writing unit tests a pain.
> >
> > 2. There's actually a few getter methods on `InitContext` that have
> > duplicate/redundant info for what ExecutionConfig exposes. For example,
> > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
> > currently exist and it can be confusing if users find 2 sources of that
> > information (either via the `InitContext` and via the wrapped
> > `ExecutionConfig`).
> >
> > All in all, it feels like `Sink#InitContext` was introduced initially as
> a
> > means to selectively only expose certain information to sinks.
> >
> > It looks like right now, the only requirement is that some sinks require
> 1)
> > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
> > make sense to follow the original intent and only selectively expose
> these?
> > For 1), we can just add a new method to `InitContext` and forward the
> > information from `ExecutionConfig` accessible at the operator level.
> > For 2), would it make sense to create the serializer at the operator
> level
> > and then provide it through `InitContext`?
> >
> > Thanks,
> > Gordon
> >
> > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu  wrote:
> >
> > > We can let the `InitContext` return `ExecutionConfig` in the interface.
> > > However, a `ReadableExecutionConfig` implementation should be returned
> > > so that exceptions will be thrown if users tries to modify the
> > > `ExecutionConfig`.
> > >
> > > We can rework all the setters of `ExecutionConfig` to internally
> invoke a
> > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> > > just override that method. But pay attention to a few exceptional
> > > setters, i.e. those for globalJobParameters and serializers.
> > >
> > > We should also explicitly state in the documentation of
> > > `InitContext #getExecutionConfig()`, that the returned
> `ExecutionConfig`
> > > is unmodifiable.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > João Boto  于2023年4月17日周一 16:51写道:
> > > >
> > > > Hi Zhu,
> > > >
> > > > Thanks for you time for reviewing this.
> > > >
> > > > Extending ´ExecutionConfig´ will allow to modify the values in the
> > > config (this is what we want to prevent with Option2)
> > > >
> > > > To extend the ExecutionConfig is not simpler to do Option1 (expose
> > > ExecutionConfig directly).
> > > >
> > > > Regards
> > > >
> > > >
> > > >
> > > > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > > > Hi João,
> > > > >
> > > > > Thanks for creating this FLIP!
> > > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> > > > >
> > > > > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > > > > `ExecutionConfig`, because otherwise we have to introduce a new
> method
> > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The
> new
> > > > > method may require every `TypeInformation` to implement it,
> including
> > > > > Flink built-in ones and custom ones, otherwise exceptions will
> happen.
> > > > > That goal, however, is pretty hard to achieve.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > João Boto  于2023年2月28日周二 23:34写道:
> > > > > >
> > > > > > I have update the FLIP with the 2 options that we have
> discussed..
> > > > > >
> > > > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > > > this have a minimal impact as we only have to expose the new
> methods
> > > > > >
> > > > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > > > with this option we have more impact as we need to add a new
> method
> > > to TypeInformation and change a

Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-04-21 Thread Zhu Zhu
Good point! @Gordon
Introducing an `InitContext#createSerializer(TypeInformation)` looks a
better option to me, so we do not need to introduce an unmodifiable
`ExecutionConfig` at this moment.

Hope that we can make `ExecutionConfig` a read-only interface in
Flink 2.0. It is exposed in `RuntimeContext` to user functions already,
while mutating the values at runtime is actually an undefined behavior.

Thanks,
Zhu

Tzu-Li (Gordon) Tai  于2023年4月18日周二 01:02写道:
>
> Hi,
>
> Sorry for chiming in late.
>
> I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
> directly through Sink#InitContext is the right thing to do.
>
> 1. A lot of the read-only getter methods on ExecutionConfig are irrelevant
> for sinks. Expanding the scope of the InitContext interface with so many
> irrelevant methods is probably going to make writing unit tests a pain.
>
> 2. There's actually a few getter methods on `InitContext` that have
> duplicate/redundant info for what ExecutionConfig exposes. For example,
> InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
> currently exist and it can be confusing if users find 2 sources of that
> information (either via the `InitContext` and via the wrapped
> `ExecutionConfig`).
>
> All in all, it feels like `Sink#InitContext` was introduced initially as a
> means to selectively only expose certain information to sinks.
>
> It looks like right now, the only requirement is that some sinks require 1)
> isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
> make sense to follow the original intent and only selectively expose these?
> For 1), we can just add a new method to `InitContext` and forward the
> information from `ExecutionConfig` accessible at the operator level.
> For 2), would it make sense to create the serializer at the operator level
> and then provide it through `InitContext`?
>
> Thanks,
> Gordon
>
> On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu  wrote:
>
> > We can let the `InitContext` return `ExecutionConfig` in the interface.
> > However, a `ReadableExecutionConfig` implementation should be returned
> > so that exceptions will be thrown if users tries to modify the
> > `ExecutionConfig`.
> >
> > We can rework all the setters of `ExecutionConfig` to internally invoke a
> > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> > just override that method. But pay attention to a few exceptional
> > setters, i.e. those for globalJobParameters and serializers.
> >
> > We should also explicitly state in the documentation of
> > `InitContext #getExecutionConfig()`, that the returned `ExecutionConfig`
> > is unmodifiable.
> >
> > Thanks,
> > Zhu
> >
> > João Boto  于2023年4月17日周一 16:51写道:
> > >
> > > Hi Zhu,
> > >
> > > Thanks for you time for reviewing this.
> > >
> > > Extending ´ExecutionConfig´ will allow to modify the values in the
> > config (this is what we want to prevent with Option2)
> > >
> > > To extend the ExecutionConfig is not simpler to do Option1 (expose
> > ExecutionConfig directly).
> > >
> > > Regards
> > >
> > >
> > >
> > > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > > Hi João,
> > > >
> > > > Thanks for creating this FLIP!
> > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> > > >
> > > > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > > > `ExecutionConfig`, because otherwise we have to introduce a new method
> > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
> > > > method may require every `TypeInformation` to implement it, including
> > > > Flink built-in ones and custom ones, otherwise exceptions will happen.
> > > > That goal, however, is pretty hard to achieve.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > João Boto  于2023年2月28日周二 23:34写道:
> > > > >
> > > > > I have update the FLIP with the 2 options that we have discussed..
> > > > >
> > > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > > this have a minimal impact as we only have to expose the new methods
> > > > >
> > > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > > with this option we have more impact as we need to add a new method
> > to TypeInformation and change all implementations (current exists 72
> > implementations)
> > > > >
> > > > > Waiting for feedback or concerns about the two options
> > > >
> >


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-04-17 Thread Tzu-Li (Gordon) Tai
Hi,

Sorry for chiming in late.

I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig
directly through Sink#InitContext is the right thing to do.

1. A lot of the read-only getter methods on ExecutionConfig are irrelevant
for sinks. Expanding the scope of the InitContext interface with so many
irrelevant methods is probably going to make writing unit tests a pain.

2. There's actually a few getter methods on `InitContext` that have
duplicate/redundant info for what ExecutionConfig exposes. For example,
InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber
currently exist and it can be confusing if users find 2 sources of that
information (either via the `InitContext` and via the wrapped
`ExecutionConfig`).

All in all, it feels like `Sink#InitContext` was introduced initially as a
means to selectively only expose certain information to sinks.

It looks like right now, the only requirement is that some sinks require 1)
isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it
make sense to follow the original intent and only selectively expose these?
For 1), we can just add a new method to `InitContext` and forward the
information from `ExecutionConfig` accessible at the operator level.
For 2), would it make sense to create the serializer at the operator level
and then provide it through `InitContext`?

Thanks,
Gordon

On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu  wrote:

> We can let the `InitContext` return `ExecutionConfig` in the interface.
> However, a `ReadableExecutionConfig` implementation should be returned
> so that exceptions will be thrown if users tries to modify the
> `ExecutionConfig`.
>
> We can rework all the setters of `ExecutionConfig` to internally invoke a
> `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
> just override that method. But pay attention to a few exceptional
> setters, i.e. those for globalJobParameters and serializers.
>
> We should also explicitly state in the documentation of
> `InitContext #getExecutionConfig()`, that the returned `ExecutionConfig`
> is unmodifiable.
>
> Thanks,
> Zhu
>
> João Boto  于2023年4月17日周一 16:51写道:
> >
> > Hi Zhu,
> >
> > Thanks for you time for reviewing this.
> >
> > Extending ´ExecutionConfig´ will allow to modify the values in the
> config (this is what we want to prevent with Option2)
> >
> > To extend the ExecutionConfig is not simpler to do Option1 (expose
> ExecutionConfig directly).
> >
> > Regards
> >
> >
> >
> > On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > > Hi João,
> > >
> > > Thanks for creating this FLIP!
> > > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> > >
> > > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > > `ExecutionConfig`, because otherwise we have to introduce a new method
> > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
> > > method may require every `TypeInformation` to implement it, including
> > > Flink built-in ones and custom ones, otherwise exceptions will happen.
> > > That goal, however, is pretty hard to achieve.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > João Boto  于2023年2月28日周二 23:34写道:
> > > >
> > > > I have update the FLIP with the 2 options that we have discussed..
> > > >
> > > > Option 1: Expose ExecutionConfig directly on InitContext
> > > > this have a minimal impact as we only have to expose the new methods
> > > >
> > > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > > with this option we have more impact as we need to add a new method
> to TypeInformation and change all implementations (current exists 72
> implementations)
> > > >
> > > > Waiting for feedback or concerns about the two options
> > >
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-04-17 Thread Zhu Zhu
We can let the `InitContext` return `ExecutionConfig` in the interface.
However, a `ReadableExecutionConfig` implementation should be returned
so that exceptions will be thrown if users tries to modify the
`ExecutionConfig`.

We can rework all the setters of `ExecutionConfig` to internally invoke a
`setConfiguration(...)` method. Then the `ReadableExecutionConfig` can
just override that method. But pay attention to a few exceptional
setters, i.e. those for globalJobParameters and serializers.

We should also explicitly state in the documentation of
`InitContext #getExecutionConfig()`, that the returned `ExecutionConfig`
is unmodifiable.

Thanks,
Zhu

João Boto  于2023年4月17日周一 16:51写道:
>
> Hi Zhu,
>
> Thanks for you time for reviewing this.
>
> Extending ´ExecutionConfig´ will allow to modify the values in the config 
> (this is what we want to prevent with Option2)
>
> To extend the ExecutionConfig is not simpler to do Option1 (expose 
> ExecutionConfig directly).
>
> Regards
>
>
>
> On 2023/04/03 09:42:28 Zhu Zhu wrote:
> > Hi João,
> >
> > Thanks for creating this FLIP!
> > I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> >
> > Yet I think it's better to let the `ReadableExecutionConfig` extend
> > `ExecutionConfig`, because otherwise we have to introduce a new method
> > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
> > method may require every `TypeInformation` to implement it, including
> > Flink built-in ones and custom ones, otherwise exceptions will happen.
> > That goal, however, is pretty hard to achieve.
> >
> > Thanks,
> > Zhu
> >
> > João Boto  于2023年2月28日周二 23:34写道:
> > >
> > > I have update the FLIP with the 2 options that we have discussed..
> > >
> > > Option 1: Expose ExecutionConfig directly on InitContext
> > > this have a minimal impact as we only have to expose the new methods
> > >
> > > Option 2: Expose ReadableExecutionConfig on InitContext
> > > with this option we have more impact as we need to add a new method to 
> > > TypeInformation and change all implementations (current exists 72 
> > > implementations)
> > >
> > > Waiting for feedback or concerns about the two options
> >


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-04-17 Thread João Boto
Hi Zhu,

Thanks for you time for reviewing this.

Extending ´ExecutionConfig´ will allow to modify the values in the config (this 
is what we want to prevent with Option2)

To extend the ExecutionConfig is not simpler to do Option1 (expose 
ExecutionConfig directly).

Regards



On 2023/04/03 09:42:28 Zhu Zhu wrote:
> Hi João,
> 
> Thanks for creating this FLIP!
> I'm overall +1 for it to unblock the migration of sinks to SinkV2.
> 
> Yet I think it's better to let the `ReadableExecutionConfig` extend
> `ExecutionConfig`, because otherwise we have to introduce a new method
> `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
> method may require every `TypeInformation` to implement it, including
> Flink built-in ones and custom ones, otherwise exceptions will happen.
> That goal, however, is pretty hard to achieve.
> 
> Thanks,
> Zhu
> 
> João Boto  于2023年2月28日周二 23:34写道:
> >
> > I have update the FLIP with the 2 options that we have discussed..
> >
> > Option 1: Expose ExecutionConfig directly on InitContext
> > this have a minimal impact as we only have to expose the new methods
> >
> > Option 2: Expose ReadableExecutionConfig on InitContext
> > with this option we have more impact as we need to add a new method to 
> > TypeInformation and change all implementations (current exists 72 
> > implementations)
> >
> > Waiting for feedback or concerns about the two options
> 


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-04-03 Thread Zhu Zhu
Hi João,

Thanks for creating this FLIP!
I'm overall +1 for it to unblock the migration of sinks to SinkV2.

Yet I think it's better to let the `ReadableExecutionConfig` extend
`ExecutionConfig`, because otherwise we have to introduce a new method
`TypeInformation#createSerializer(ReadableExecutionConfig)`. The new
method may require every `TypeInformation` to implement it, including
Flink built-in ones and custom ones, otherwise exceptions will happen.
That goal, however, is pretty hard to achieve.

Thanks,
Zhu

João Boto  于2023年2月28日周二 23:34写道:
>
> I have update the FLIP with the 2 options that we have discussed..
>
> Option 1: Expose ExecutionConfig directly on InitContext
> this have a minimal impact as we only have to expose the new methods
>
> Option 2: Expose ReadableExecutionConfig on InitContext
> with this option we have more impact as we need to add a new method to 
> TypeInformation and change all implementations (current exists 72 
> implementations)
>
> Waiting for feedback or concerns about the two options


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-02-28 Thread João Boto
I have update the FLIP with the 2 options that we have discussed..

Option 1: Expose ExecutionConfig directly on InitContext
this have a minimal impact as we only have to expose the new methods

Option 2: Expose ReadableExecutionConfig on InitContext
with this option we have more impact as we need to add a new method to 
TypeInformation and change all implementations (current exists 72 
implementations)

Waiting for feedback or concerns about the two options 


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-02-12 Thread Lijie Wang
Hi Konstantin,

I checked the usage of ExecutionConfig in Kinesis and KafKa sinks:
- Kinesis sink: ExecutionConfig is not used by Kinesis sink. The one that
uses getAutoWatermarkInterval is the Kinesis Source, Joao may have made a
mistake.
- Kafka sink: isObjectReuseEnabled and ExecutionConfig are used in upsert
kafka table sink. The upsert kafka table sink obtains the ExecutionConfig
through DataStreamSinkProvider, however, this way cannot be used for
datastream sink and other SinkRuntimeProviders.

Besides, I know that all the jdbc sinks(DataStream/Table) need the
isObjectReuseEnabled and ExecutionConfig. The jdbc sink will buffer the
records received and only flush them out when the buffer is full or a
periodic timer is triggered or a checkpoint happens. The jdbc sinks will
decide whether to buffer the copies or the original records based on
isObjectReuseEnabled, when the object reuse is enabled, we should buffer
the copies(because the content of the objects may be changed before flush),
otherwise we should buffer the original records. And it needs the
ExecutionConfig to create TypeSerializer to copy the records.

Actaully, the upsert kafka table sink is similar to jdbc sink, I think all
the sinks that with the "buffer records" behavior needs the
isObjectReuseEnabled and ExecutionConfig.

Best,
Lijie

Konstantin Knauf  于2023年2月4日周六 01:41写道:

> Hi everyone,
>
> if I am not mistaken of the sinks mentioned by Joao Kafka, Kinesis &
> Kinesis already use the Sink2 API. How were those implemented without
> exposing the ExecutionConfig?
>
> Best,
>
> Konstantin
>
>
> Am Mi., 1. Feb. 2023 um 12:28 Uhr schrieb Lijie Wang <
> wangdachui9...@gmail.com>:
>
> > +1 for Option 2, if we can abstract an "ReadableExecutionConfig"
> > interface(contains all is/get mehtod), and let ExecutionConfig implements
> > ReadableExecutionConfig
> >
> > Best,
> > Lijie
> >
> > João Boto  于2023年1月17日周二 20:39写道:
> >
> > > Hi all,
> > >
> > > As establish a read-only contract seems to be consensual approach,
> > talking
> > > to Lijie we saw two ways for doing this..
> > >
> > > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig
> (just
> > > like the UnmodifiableConfiguration)
> > > Pros:
> > > - we have all the get methods
> > > - don't need to change TypeInformation.createSerializer(ExecutionConfig
> > > config)
> > > Cons:
> > > - we have to override 34 methods that modify things..
> > > - new methods to ExecutionConfig will need to be override on
> > > UnmodifiableExecutionConfig
> > >
> > >
> > > Option 2: UnmodifiableExecutionConfig without extending
> ExecutionConfig.
> > > Pros:
> > > - new class so we don't need to override nothing.
> > > - modifications to ExecutionConfig don't affect this class
> > > Cons:
> > > - need to change TypeInformation adding
> > > createSerializer(UnmodifiableExecutionConfig config)
> > > - need to add all get methods or only what needed (this could be a
> pros)
> > >
> > >
> > > What option you think is better?
> > >
> > >
> > >
> > > On 2023/01/13 14:15:04 Joao Boto wrote:
> > > > Hi flink devs,
> > > >
> > > > I'd like to start a discussion thread for FLIP-287[1].
> > > > This comes from an offline discussion with @Lijie Wang, from
> > FLIP-239[2]
> > > > specially for the sink[3].
> > > >
> > > > Basically to expose the ExecutionConfig and JobId on
> > SinkV2#InitContext.
> > > > This  changes are necessary to correct migrate the current sinks to
> > > SinkV2
> > > > like JdbcSink, KafkaTableSink and so on, that relies on
> RuntimeContext
> > > >
> > > > Comments are welcome!
> > > > Thanks,
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > > >
> > >
> >
>
>
> --
> https://twitter.com/snntrable
> https://github.com/knaufk
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-02-03 Thread Konstantin Knauf
Hi everyone,

if I am not mistaken of the sinks mentioned by Joao Kafka, Kinesis &
Kinesis already use the Sink2 API. How were those implemented without
exposing the ExecutionConfig?

Best,

Konstantin


Am Mi., 1. Feb. 2023 um 12:28 Uhr schrieb Lijie Wang <
wangdachui9...@gmail.com>:

> +1 for Option 2, if we can abstract an "ReadableExecutionConfig"
> interface(contains all is/get mehtod), and let ExecutionConfig implements
> ReadableExecutionConfig
>
> Best,
> Lijie
>
> João Boto  于2023年1月17日周二 20:39写道:
>
> > Hi all,
> >
> > As establish a read-only contract seems to be consensual approach,
> talking
> > to Lijie we saw two ways for doing this..
> >
> > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just
> > like the UnmodifiableConfiguration)
> > Pros:
> > - we have all the get methods
> > - don't need to change TypeInformation.createSerializer(ExecutionConfig
> > config)
> > Cons:
> > - we have to override 34 methods that modify things..
> > - new methods to ExecutionConfig will need to be override on
> > UnmodifiableExecutionConfig
> >
> >
> > Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig.
> > Pros:
> > - new class so we don't need to override nothing.
> > - modifications to ExecutionConfig don't affect this class
> > Cons:
> > - need to change TypeInformation adding
> > createSerializer(UnmodifiableExecutionConfig config)
> > - need to add all get methods or only what needed (this could be a pros)
> >
> >
> > What option you think is better?
> >
> >
> >
> > On 2023/01/13 14:15:04 Joao Boto wrote:
> > > Hi flink devs,
> > >
> > > I'd like to start a discussion thread for FLIP-287[1].
> > > This comes from an offline discussion with @Lijie Wang, from
> FLIP-239[2]
> > > specially for the sink[3].
> > >
> > > Basically to expose the ExecutionConfig and JobId on
> SinkV2#InitContext.
> > > This  changes are necessary to correct migrate the current sinks to
> > SinkV2
> > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> > >
> > > Comments are welcome!
> > > Thanks,
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > >
> >
>


-- 
https://twitter.com/snntrable
https://github.com/knaufk


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-02-01 Thread Lijie Wang
+1 for Option 2, if we can abstract an "ReadableExecutionConfig"
interface(contains all is/get mehtod), and let ExecutionConfig implements
ReadableExecutionConfig

Best,
Lijie

João Boto  于2023年1月17日周二 20:39写道:

> Hi all,
>
> As establish a read-only contract seems to be consensual approach, talking
> to Lijie we saw two ways for doing this..
>
> Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just
> like the UnmodifiableConfiguration)
> Pros:
> - we have all the get methods
> - don't need to change TypeInformation.createSerializer(ExecutionConfig
> config)
> Cons:
> - we have to override 34 methods that modify things..
> - new methods to ExecutionConfig will need to be override on
> UnmodifiableExecutionConfig
>
>
> Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig.
> Pros:
> - new class so we don't need to override nothing.
> - modifications to ExecutionConfig don't affect this class
> Cons:
> - need to change TypeInformation adding
> createSerializer(UnmodifiableExecutionConfig config)
> - need to add all get methods or only what needed (this could be a pros)
>
>
> What option you think is better?
>
>
>
> On 2023/01/13 14:15:04 Joao Boto wrote:
> > Hi flink devs,
> >
> > I'd like to start a discussion thread for FLIP-287[1].
> > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> > specially for the sink[3].
> >
> > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> > This  changes are necessary to correct migrate the current sinks to
> SinkV2
> > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> >
> > Comments are welcome!
> > Thanks,
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > [2]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > [3] https://issues.apache.org/jira/browse/FLINK-25421
> >
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-17 Thread João Boto
Hi all,

As establish a read-only contract seems to be consensual approach, talking to 
Lijie we saw two ways for doing this..

Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just like 
the UnmodifiableConfiguration)
Pros: 
- we have all the get methods
- don't need to change TypeInformation.createSerializer(ExecutionConfig config)
Cons:
- we have to override 34 methods that modify things..
- new methods to ExecutionConfig will need to be override on 
UnmodifiableExecutionConfig 


Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig.
Pros:
- new class so we don't need to override nothing.
- modifications to ExecutionConfig don't affect this class
Cons: 
- need to change TypeInformation adding 
createSerializer(UnmodifiableExecutionConfig config)
- need to add all get methods or only what needed (this could be a pros)


What option you think is better?



On 2023/01/13 14:15:04 Joao Boto wrote:
> Hi flink devs,
> 
> I'd like to start a discussion thread for FLIP-287[1].
> This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> specially for the sink[3].
> 
> Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> This  changes are necessary to correct migrate the current sinks to SinkV2
> like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> 
> Comments are welcome!
> Thanks,
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> [3] https://issues.apache.org/jira/browse/FLINK-25421
> 


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-16 Thread Lijie Wang
Hi Joao,

Thanks for driving this FLIP, +1 for exposing a read-only ExecutionConfig,
users/developers are not expected to modify the ExecutionConfig in Sink.

Can we directly introduce the read-only ExecutionConfig in this FLIP? No
need for a separate FLIP, because currently it will only be used in
InitContext and will not affect other user interfaces. WDYT? cc @Gunnar

Best,
Lijie


João Boto  于2023年1月16日周一 19:22写道:

> Hi Jing Ge,
> Thanks for your response..
>
> Making the review left above about all connectors, I realise that we need
> the full ExecutionConfig as it is needed to generate the serializer
> correctly if objectReuse is enabled as we call
> TypeInformation.createSerializer(ExecutionConfig config)
>
> On the PoC we use the ExecutionConfig and I don't see this... :(
>
> Regards
>
> On 2023/01/14 00:01:52 Jing Ge wrote:
> > Hi Joao,
> >
> > Thanks for bringing this up. Exposing internal domain instances depends
> on
> > your requirements. Technically, it is even possible to expose the
> > RuntimeContext [1] (must be considered very carefully). Since you
> mentioned
> > that you only need to know if objectReuse is enabled, how about just
> expose
> > isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to
> > shrink the scope as small as possible to satisfy the requirement. If more
> > information from ExecutionConfig is needed later, we still can refactor
> the
> > code properly according to the strong motivation.
> >
> > Best regards,
> > Jing
> >
> > [1]
> >
> https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279
> >
> > On Fri, Jan 13, 2023 at 6:19 PM João Boto  wrote:
> >
> > > Hi Gunnar,
> > > Thanks for your time and response...
> > >
> > > I think the problem you want to solve is the exposure of the
> > > ExecutionConfig (that can be mutated) no?
> > > The configuration is not mutated, we only need to know if objectReuse
> is
> > > enable.
> > > This is already expose on RuntimeContext we think to keep it similar
> to it
> > > to simplify any migrations, but as said, for this migration from
> > > ExecutionConfig we only need the isObjectReuseEnabled, and we could
> expose
> > > only this configuration..
> > >
> > > Best regards,
> > >
> > >
> > > On 2023/01/13 15:50:09 Gunnar Morling wrote:
> > > > Hey Joao,
> > > >
> > > > Thanks for this FLIP! One question on the proposed interface changes:
> > > > is it expected that the configuration is *mutated* via the
> InitContext
> > > > passed to Sink::createWriter()? If that's not the case, how about
> > > > establishing a read-only contract representing the current
> > > > configuration and passing in that one instead? That would probably
> > > > deserve its own FLIP upon which yours here then would depend. Later
> > > > on, other contracts which effectively shouldn't modify a config could
> > > > use that one, too.
> > > >
> > > > Note I don't mean to stall your efforts here, but I thought it'd be a
> > > > good idea to bring it up and gauge the general interest in this.
> > > >
> > > > Best,
> > > >
> > > > --Gunnar
> > > >
> > > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto  >:
> > > > >
> > > > > Hi flink devs,
> > > > >
> > > > > I'd like to start a discussion thread for FLIP-287[1].
> > > > > This comes from an offline discussion with @Lijie Wang, from
> > > FLIP-239[2]
> > > > > specially for the sink[3].
> > > > >
> > > > > Basically to expose the ExecutionConfig and JobId on
> > > SinkV2#InitContext.
> > > > > This  changes are necessary to correct migrate the current sinks to
> > > SinkV2
> > > > > like JdbcSink, KafkaTableSink and so on, that relies on
> RuntimeContext
> > > > >
> > > > > Comments are welcome!
> > > > > Thanks,
> > > > >
> > > > > [1]
> > > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > > > [2]
> > > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-16 Thread João Boto
Hi Jing Ge,
Thanks for your response..

Making the review left above about all connectors, I realise that we need the 
full ExecutionConfig as it is needed to generate the serializer correctly if 
objectReuse is enabled as we call 
TypeInformation.createSerializer(ExecutionConfig config)

On the PoC we use the ExecutionConfig and I don't see this... :(

Regards

On 2023/01/14 00:01:52 Jing Ge wrote:
> Hi Joao,
> 
> Thanks for bringing this up. Exposing internal domain instances depends on
> your requirements. Technically, it is even possible to expose the
> RuntimeContext [1] (must be considered very carefully). Since you mentioned
> that you only need to know if objectReuse is enabled, how about just expose
> isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to
> shrink the scope as small as possible to satisfy the requirement. If more
> information from ExecutionConfig is needed later, we still can refactor the
> code properly according to the strong motivation.
> 
> Best regards,
> Jing
> 
> [1]
> https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279
> 
> On Fri, Jan 13, 2023 at 6:19 PM João Boto  wrote:
> 
> > Hi Gunnar,
> > Thanks for your time and response...
> >
> > I think the problem you want to solve is the exposure of the
> > ExecutionConfig (that can be mutated) no?
> > The configuration is not mutated, we only need to know if objectReuse is
> > enable.
> > This is already expose on RuntimeContext we think to keep it similar to it
> > to simplify any migrations, but as said, for this migration from
> > ExecutionConfig we only need the isObjectReuseEnabled, and we could expose
> > only this configuration..
> >
> > Best regards,
> >
> >
> > On 2023/01/13 15:50:09 Gunnar Morling wrote:
> > > Hey Joao,
> > >
> > > Thanks for this FLIP! One question on the proposed interface changes:
> > > is it expected that the configuration is *mutated* via the InitContext
> > > passed to Sink::createWriter()? If that's not the case, how about
> > > establishing a read-only contract representing the current
> > > configuration and passing in that one instead? That would probably
> > > deserve its own FLIP upon which yours here then would depend. Later
> > > on, other contracts which effectively shouldn't modify a config could
> > > use that one, too.
> > >
> > > Note I don't mean to stall your efforts here, but I thought it'd be a
> > > good idea to bring it up and gauge the general interest in this.
> > >
> > > Best,
> > >
> > > --Gunnar
> > >
> > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto :
> > > >
> > > > Hi flink devs,
> > > >
> > > > I'd like to start a discussion thread for FLIP-287[1].
> > > > This comes from an offline discussion with @Lijie Wang, from
> > FLIP-239[2]
> > > > specially for the sink[3].
> > > >
> > > > Basically to expose the ExecutionConfig and JobId on
> > SinkV2#InitContext.
> > > > This  changes are necessary to correct migrate the current sinks to
> > SinkV2
> > > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> > > >
> > > > Comments are welcome!
> > > > Thanks,
> > > >
> > > > [1]
> > > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > > [2]
> > > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> > >
> >
> 


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-16 Thread João Boto
Hi all,
After an offline talk with Gunnar I review all connectors on branch 
release_1.16 (still has virtually all connectors) we can see some of them using 
ExecutionConfig..

The basic use is for creating serializers: 
TypeInformation.createSerializer(ExecutionConfig config), and since it is a 
Flink-core class, I think the easy way is to expose the full ExecutionConfig..
As note on some of them the config is not used on implementations..

For a rapid view (ignoring the use of ExecutionConfig.ClosureCleanerLevel as 
its a enum),

- Cassandra connector: 
  - uses TypeInformation.createSerializer (but builds a new ExecutionConfig 
for it)

- Files connector:
  - uses TypeInformation.createSerializer
  - creates KyroSerializer that use: getDefaultKryoSerializers, 
getDefaultKryoSerializerClasses, getRegisteredKryoTypes, 
getRegisteredTypesWithKryoSerializerClasses, 
getRegisteredTypesWithKryoSerializers

- Hive connector:
  - uses TypeInformation.createSerializer (but builds a new ExecutionConfig 
for it)

- Jdbc connector:
  - uses TypeInformation.createSerializer
  - uses isObjectReuseEnable

- Kafka connector:
  - uses TypeInformation.createSerializer
  - uses getAutoWatermarkInterval
  - creates KyroSerializer

- Kinesis connector:
  - uses getAutoWatermarkInterval

- Pulsar connector: 
  - uses TypeInformation.createSerializer



We can create a FLIP to discuss a new ReadOnlyConfig but this will lead us to 
discuss lot of new things.

Best regards


On 2023/01/13 14:15:04 Joao Boto wrote:
> Hi flink devs,
> 
> I'd like to start a discussion thread for FLIP-287[1].
> This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> specially for the sink[3].
> 
> Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> This  changes are necessary to correct migrate the current sinks to SinkV2
> like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> 
> Comments are welcome!
> Thanks,
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> [3] https://issues.apache.org/jira/browse/FLINK-25421
> 


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-13 Thread Jing Ge
Hi Joao,

Thanks for bringing this up. Exposing internal domain instances depends on
your requirements. Technically, it is even possible to expose the
RuntimeContext [1] (must be considered very carefully). Since you mentioned
that you only need to know if objectReuse is enabled, how about just expose
isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to
shrink the scope as small as possible to satisfy the requirement. If more
information from ExecutionConfig is needed later, we still can refactor the
code properly according to the strong motivation.

Best regards,
Jing

[1]
https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279

On Fri, Jan 13, 2023 at 6:19 PM João Boto  wrote:

> Hi Gunnar,
> Thanks for your time and response...
>
> I think the problem you want to solve is the exposure of the
> ExecutionConfig (that can be mutated) no?
> The configuration is not mutated, we only need to know if objectReuse is
> enable.
> This is already expose on RuntimeContext we think to keep it similar to it
> to simplify any migrations, but as said, for this migration from
> ExecutionConfig we only need the isObjectReuseEnabled, and we could expose
> only this configuration..
>
> Best regards,
>
>
> On 2023/01/13 15:50:09 Gunnar Morling wrote:
> > Hey Joao,
> >
> > Thanks for this FLIP! One question on the proposed interface changes:
> > is it expected that the configuration is *mutated* via the InitContext
> > passed to Sink::createWriter()? If that's not the case, how about
> > establishing a read-only contract representing the current
> > configuration and passing in that one instead? That would probably
> > deserve its own FLIP upon which yours here then would depend. Later
> > on, other contracts which effectively shouldn't modify a config could
> > use that one, too.
> >
> > Note I don't mean to stall your efforts here, but I thought it'd be a
> > good idea to bring it up and gauge the general interest in this.
> >
> > Best,
> >
> > --Gunnar
> >
> > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto :
> > >
> > > Hi flink devs,
> > >
> > > I'd like to start a discussion thread for FLIP-287[1].
> > > This comes from an offline discussion with @Lijie Wang, from
> FLIP-239[2]
> > > specially for the sink[3].
> > >
> > > Basically to expose the ExecutionConfig and JobId on
> SinkV2#InitContext.
> > > This  changes are necessary to correct migrate the current sinks to
> SinkV2
> > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> > >
> > > Comments are welcome!
> > > Thanks,
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > [2]
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > > [3] https://issues.apache.org/jira/browse/FLINK-25421
> >
>


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-13 Thread João Boto
Hi Gunnar,
Thanks for your time and response...

I think the problem you want to solve is the exposure of the ExecutionConfig 
(that can be mutated) no? 
The configuration is not mutated, we only need to know if objectReuse is 
enable. 
This is already expose on RuntimeContext we think to keep it similar to it to 
simplify any migrations, but as said, for this migration from ExecutionConfig 
we only need the isObjectReuseEnabled, and we could expose only this 
configuration..

Best regards,


On 2023/01/13 15:50:09 Gunnar Morling wrote:
> Hey Joao,
> 
> Thanks for this FLIP! One question on the proposed interface changes:
> is it expected that the configuration is *mutated* via the InitContext
> passed to Sink::createWriter()? If that's not the case, how about
> establishing a read-only contract representing the current
> configuration and passing in that one instead? That would probably
> deserve its own FLIP upon which yours here then would depend. Later
> on, other contracts which effectively shouldn't modify a config could
> use that one, too.
> 
> Note I don't mean to stall your efforts here, but I thought it'd be a
> good idea to bring it up and gauge the general interest in this.
> 
> Best,
> 
> --Gunnar
> 
> Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto :
> >
> > Hi flink devs,
> >
> > I'd like to start a discussion thread for FLIP-287[1].
> > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> > specially for the sink[3].
> >
> > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> > This  changes are necessary to correct migrate the current sinks to SinkV2
> > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> >
> > Comments are welcome!
> > Thanks,
> >
> > [1]
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > [2]
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> > [3] https://issues.apache.org/jira/browse/FLINK-25421
> 


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-13 Thread Gunnar Morling
Hey Joao,

Thanks for this FLIP! One question on the proposed interface changes:
is it expected that the configuration is *mutated* via the InitContext
passed to Sink::createWriter()? If that's not the case, how about
establishing a read-only contract representing the current
configuration and passing in that one instead? That would probably
deserve its own FLIP upon which yours here then would depend. Later
on, other contracts which effectively shouldn't modify a config could
use that one, too.

Note I don't mean to stall your efforts here, but I thought it'd be a
good idea to bring it up and gauge the general interest in this.

Best,

--Gunnar

Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto :
>
> Hi flink devs,
>
> I'd like to start a discussion thread for FLIP-287[1].
> This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> specially for the sink[3].
>
> Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> This  changes are necessary to correct migrate the current sinks to SinkV2
> like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
>
> Comments are welcome!
> Thanks,
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> [3] https://issues.apache.org/jira/browse/FLINK-25421


[DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-01-13 Thread Joao Boto
Hi flink devs,

I'd like to start a discussion thread for FLIP-287[1].
This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
specially for the sink[3].

Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
This  changes are necessary to correct migrate the current sinks to SinkV2
like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext

Comments are welcome!
Thanks,

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
[3] https://issues.apache.org/jira/browse/FLINK-25421