Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
If there is no more questions, I will start the voting thread tomorrow On 2023/01/13 14:15:04 Joao Boto wrote: > Hi flink devs, > > I'd like to start a discussion thread for FLIP-287[1]. > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] > specially for the sink[3]. > > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. > This changes are necessary to correct migrate the current sinks to SinkV2 > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > Comments are welcome! > Thanks, > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > [2] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > [3] https://issues.apache.org/jira/browse/FLINK-25421 >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi, > I think we can get the serializer directly in InitContextImpl through `getOperatorConfig().getTypeSerializerIn(0, getUserCodeClassloader()).duplicate()`. This should work, yes. +1 to the updated FLIP so far. Thank you, Joao, for being on top of this! Thanks, Gordon On Tue, May 30, 2023 at 12:34 AM João Boto wrote: > Hi Lijie, > > I left the two options to use whatever you want, but I can clean the FLIP > to have only one.. > > Updated the FLIP > > Regards > > On 2023/05/23 07:23:45 Lijie Wang wrote: > > Hi Joao, > > > > I noticed the FLIP currently contains the following 2 methods about type > > serializer: > > > > (1) TypeSerializer createInputSerializer(); > > (2) TypeSerializer createSerializer(TypeInformation inType); > > > > Is the method (2) still needed now? > > > > Best, > > Lijie > > > > João Boto 于2023年5月19日周五 16:53写道: > > > > > Updated the FLIP to use this option. > > > > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Lijie, I left the two options to use whatever you want, but I can clean the FLIP to have only one.. Updated the FLIP Regards On 2023/05/23 07:23:45 Lijie Wang wrote: > Hi Joao, > > I noticed the FLIP currently contains the following 2 methods about type > serializer: > > (1) TypeSerializer createInputSerializer(); > (2) TypeSerializer createSerializer(TypeInformation inType); > > Is the method (2) still needed now? > > Best, > Lijie > > João Boto 于2023年5月19日周五 16:53写道: > > > Updated the FLIP to use this option. > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Joao, I noticed the FLIP currently contains the following 2 methods about type serializer: (1) TypeSerializer createInputSerializer(); (2) TypeSerializer createSerializer(TypeInformation inType); Is the method (2) still needed now? Best, Lijie João Boto 于2023年5月19日周五 16:53写道: > Updated the FLIP to use this option. >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Updated the FLIP to use this option.
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi, +1 for `InitContext#createInputSerializer()` . I think we can get the serializer directly in InitContextImpl through `getOperatorConfig().getTypeSerializerIn(0, getUserCodeClassloader()).duplicate()`. Best, Lijie. João Boto 于2023年4月24日周一 19:04写道: > Hi @Gordon, > > `InitContext#createInputSerializer()` is a great option and will solve > more than one problem, but I cant find a way to get the TypeInformation > on InitContextImpl (I can be missing something) > > On current (legacy) implementations we rely on interface ´ > InputTypeConfigurable´ to get the TypeInformation but this will not work > for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink) > As a side note, the ExecutionConfig provided by this interface could not > be used as can be changed after the call is made, for Table Planning for > example on DefaultExecutor.configureBatchSpecificProperties() > > At the end what we need to do is something like: > if (isObjectReuseEnabled()) serializer.copy(record) else record; > > So responding to your question, yes last option is ok for this but I dont > see how to implementing it as Im missing the TypeInformation on > InitContextImpl. > > Best regards, > > On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote: > > Do we have to introduce > `InitContext#createSerializer(TypeInformation)` > > which returns TypeSerializer, or is it sufficient to only provide > > `InitContext#createInputSerializer()` which returns TypeSerializer? > > > > I had the impression that buffering sinks like JDBC only need the > > latter. @Joao, could you confirm? > > > > If that's the case, +1 to adding the following method signatures to > > InitContext: > > * TypeSerializer createInputSerializer() > > * boolean isObjectReuseEnabled() > > > > Thanks, > > Gordon > > > > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu wrote: > > > > > Good point! @Gordon > > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a > > > better option to me, so we do not need to introduce an unmodifiable > > > `ExecutionConfig` at this moment. > > > > > > Hope that we can make `ExecutionConfig` a read-only interface in > > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already, > > > while mutating the values at runtime is actually an undefined behavior. > > > > > > Thanks, > > > Zhu > > > > > > Tzu-Li (Gordon) Tai 于2023年4月18日周二 01:02写道: > > > > > > > > Hi, > > > > > > > > Sorry for chiming in late. > > > > > > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig > > > > directly through Sink#InitContext is the right thing to do. > > > > > > > > 1. A lot of the read-only getter methods on ExecutionConfig are > > > irrelevant > > > > for sinks. Expanding the scope of the InitContext interface with so > many > > > > irrelevant methods is probably going to make writing unit tests a > pain. > > > > > > > > 2. There's actually a few getter methods on `InitContext` that have > > > > duplicate/redundant info for what ExecutionConfig exposes. For > example, > > > > InitContext#getNumberOfParallelSubtasks and > InitContext#getAttemptNumber > > > > currently exist and it can be confusing if users find 2 sources of > that > > > > information (either via the `InitContext` and via the wrapped > > > > `ExecutionConfig`). > > > > > > > > All in all, it feels like `Sink#InitContext` was introduced > initially as > > > a > > > > means to selectively only expose certain information to sinks. > > > > > > > > It looks like right now, the only requirement is that some sinks > require > > > 1) > > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. > Would it > > > > make sense to follow the original intent and only selectively expose > > > these? > > > > For 1), we can just add a new method to `InitContext` and forward the > > > > information from `ExecutionConfig` accessible at the operator level. > > > > For 2), would it make sense to create the serializer at the operator > > > level > > > > and then provide it through `InitContext`? > > > > > > > > Thanks, > > > > Gordon > > > > > > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu wrote: > > > > > > > > > We can let the `InitContext` return `ExecutionConfig` in the > interface. > > > > > However, a `ReadableExecutionConfig` implementation should be > returned > > > > > so that exceptions will be thrown if users tries to modify the > > > > > `ExecutionConfig`. > > > > > > > > > > We can rework all the setters of `ExecutionConfig` to internally > > > invoke a > > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` > can > > > > > just override that method. But pay attention to a few exceptional > > > > > setters, i.e. those for globalJobParameters and serializers. > > > > > > > > > > We should also explicitly state in the documentation of > > > > > `InitContext #getExecutionConfig()`, that the returned > > > `ExecutionConfig` > > > > > is unmodifiable. > > > > > > > > > > Thanks, > > > > > Zhu > > > > > > > > > > João B
RE: Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hello Joao Boto, Can you please add Gordon’s suggested approach as a third option. Also, it will be great to list the pros and cons of each approach in the FLIP Thanks, Raman Verma On 2023/04/24 11:04:26 João Boto wrote: > Hi @Gordon, > > `InitContext#createInputSerializer()` is a great option and will solve more > than one problem, but I cant find a way to get the TypeInformation on > InitContextImpl (I can be missing something) > > On current (legacy) implementations we rely on interface ´ > InputTypeConfigurable´ to get the TypeInformation but this will not work for > Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink) > As a side note, the ExecutionConfig provided by this interface could not be > used as can be changed after the call is made, for Table Planning for example > on DefaultExecutor.configureBatchSpecificProperties() > > At the end what we need to do is something like: > if (isObjectReuseEnabled()) serializer.copy(record) else record; > > So responding to your question, yes last option is ok for this but I dont see > how to implementing it as Im missing the TypeInformation on InitContextImpl. > > Best regards, > > On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote: > > Do we have to introduce `InitContext#createSerializer(TypeInformation)` > > which returns TypeSerializer, or is it sufficient to only provide > > `InitContext#createInputSerializer()` which returns TypeSerializer? > > > > I had the impression that buffering sinks like JDBC only need the > > latter. @Joao, could you confirm? > > > > If that's the case, +1 to adding the following method signatures to > > InitContext: > > * TypeSerializer createInputSerializer() > > * boolean isObjectReuseEnabled() > > > > Thanks, > > Gordon > > > > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu wrote: > > > > > Good point! @Gordon > > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a > > > better option to me, so we do not need to introduce an unmodifiable > > > `ExecutionConfig` at this moment. > > > > > > Hope that we can make `ExecutionConfig` a read-only interface in > > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already, > > > while mutating the values at runtime is actually an undefined behavior. > > > > > > Thanks, > > > Zhu > > > > > > Tzu-Li (Gordon) Tai 于2023年4月18日周二 01:02写道: > > > > > > > > Hi, > > > > > > > > Sorry for chiming in late. > > > > > > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig > > > > directly through Sink#InitContext is the right thing to do. > > > > > > > > 1. A lot of the read-only getter methods on ExecutionConfig are > > > irrelevant > > > > for sinks. Expanding the scope of the InitContext interface with so many > > > > irrelevant methods is probably going to make writing unit tests a pain. > > > > > > > > 2. There's actually a few getter methods on `InitContext` that have > > > > duplicate/redundant info for what ExecutionConfig exposes. For example, > > > > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber > > > > currently exist and it can be confusing if users find 2 sources of that > > > > information (either via the `InitContext` and via the wrapped > > > > `ExecutionConfig`). > > > > > > > > All in all, it feels like `Sink#InitContext` was introduced initially as > > > a > > > > means to selectively only expose certain information to sinks. > > > > > > > > It looks like right now, the only requirement is that some sinks require > > > 1) > > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it > > > > make sense to follow the original intent and only selectively expose > > > these? > > > > For 1), we can just add a new method to `InitContext` and forward the > > > > information from `ExecutionConfig` accessible at the operator level. > > > > For 2), would it make sense to create the serializer at the operator > > > level > > > > and then provide it through `InitContext`? > > > > > > > > Thanks, > > > > Gordon > > > > > > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu wrote: > > > > > > > > > We can let the `InitContext` return `ExecutionConfig` in the > > > > > interface. > > > > > However, a `ReadableExecutionConfig` implementation should be returned > > > > > so that exceptions will be thrown if users tries to modify the > > > > > `ExecutionConfig`. > > > > > > > > > > We can rework all the setters of `ExecutionConfig` to internally > > > invoke a > > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can > > > > > just override that method. But pay attention to a few exceptional > > > > > setters, i.e. those for globalJobParameters and serializers. > > > > > > > > > > We should also explicitly state in the documentation of > > > > > `InitContext #getExecutionConfig()`, that the returned > > > `ExecutionConfig` > > > > > is unmodifiable. > > > > > > > > > > Thanks, > > > > > Zhu > > > > > > > > > > João Boto 于2023年4月17日周一 16:51写道: >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi @Gordon, `InitContext#createInputSerializer()` is a great option and will solve more than one problem, but I cant find a way to get the TypeInformation on InitContextImpl (I can be missing something) On current (legacy) implementations we rely on interface ´ InputTypeConfigurable´ to get the TypeInformation but this will not work for Sink2 as is not implemented (DataStream.sinkTo vs DataStream.addSink) As a side note, the ExecutionConfig provided by this interface could not be used as can be changed after the call is made, for Table Planning for example on DefaultExecutor.configureBatchSpecificProperties() At the end what we need to do is something like: if (isObjectReuseEnabled()) serializer.copy(record) else record; So responding to your question, yes last option is ok for this but I dont see how to implementing it as Im missing the TypeInformation on InitContextImpl. Best regards, On 2023/04/21 15:04:24 "Tzu-Li (Gordon) Tai" wrote: > Do we have to introduce `InitContext#createSerializer(TypeInformation)` > which returns TypeSerializer, or is it sufficient to only provide > `InitContext#createInputSerializer()` which returns TypeSerializer? > > I had the impression that buffering sinks like JDBC only need the > latter. @Joao, could you confirm? > > If that's the case, +1 to adding the following method signatures to > InitContext: > * TypeSerializer createInputSerializer() > * boolean isObjectReuseEnabled() > > Thanks, > Gordon > > On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu wrote: > > > Good point! @Gordon > > Introducing an `InitContext#createSerializer(TypeInformation)` looks a > > better option to me, so we do not need to introduce an unmodifiable > > `ExecutionConfig` at this moment. > > > > Hope that we can make `ExecutionConfig` a read-only interface in > > Flink 2.0. It is exposed in `RuntimeContext` to user functions already, > > while mutating the values at runtime is actually an undefined behavior. > > > > Thanks, > > Zhu > > > > Tzu-Li (Gordon) Tai 于2023年4月18日周二 01:02写道: > > > > > > Hi, > > > > > > Sorry for chiming in late. > > > > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig > > > directly through Sink#InitContext is the right thing to do. > > > > > > 1. A lot of the read-only getter methods on ExecutionConfig are > > irrelevant > > > for sinks. Expanding the scope of the InitContext interface with so many > > > irrelevant methods is probably going to make writing unit tests a pain. > > > > > > 2. There's actually a few getter methods on `InitContext` that have > > > duplicate/redundant info for what ExecutionConfig exposes. For example, > > > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber > > > currently exist and it can be confusing if users find 2 sources of that > > > information (either via the `InitContext` and via the wrapped > > > `ExecutionConfig`). > > > > > > All in all, it feels like `Sink#InitContext` was introduced initially as > > a > > > means to selectively only expose certain information to sinks. > > > > > > It looks like right now, the only requirement is that some sinks require > > 1) > > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it > > > make sense to follow the original intent and only selectively expose > > these? > > > For 1), we can just add a new method to `InitContext` and forward the > > > information from `ExecutionConfig` accessible at the operator level. > > > For 2), would it make sense to create the serializer at the operator > > level > > > and then provide it through `InitContext`? > > > > > > Thanks, > > > Gordon > > > > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu wrote: > > > > > > > We can let the `InitContext` return `ExecutionConfig` in the interface. > > > > However, a `ReadableExecutionConfig` implementation should be returned > > > > so that exceptions will be thrown if users tries to modify the > > > > `ExecutionConfig`. > > > > > > > > We can rework all the setters of `ExecutionConfig` to internally > > invoke a > > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can > > > > just override that method. But pay attention to a few exceptional > > > > setters, i.e. those for globalJobParameters and serializers. > > > > > > > > We should also explicitly state in the documentation of > > > > `InitContext #getExecutionConfig()`, that the returned > > `ExecutionConfig` > > > > is unmodifiable. > > > > > > > > Thanks, > > > > Zhu > > > > > > > > João Boto 于2023年4月17日周一 16:51写道: > > > > > > > > > > Hi Zhu, > > > > > > > > > > Thanks for you time for reviewing this. > > > > > > > > > > Extending ´ExecutionConfig´ will allow to modify the values in the > > > > config (this is what we want to prevent with Option2) > > > > > > > > > > To extend the ExecutionConfig is not simpler to do Option1 (expose > > > > ExecutionConfig directly). > > > > > > > > > > Regards > > > > > > > > > > > > > > > > > > > > On 2023/04/03 09:42:28 Zhu Zhu wro
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Do we have to introduce `InitContext#createSerializer(TypeInformation)` which returns TypeSerializer, or is it sufficient to only provide `InitContext#createInputSerializer()` which returns TypeSerializer? I had the impression that buffering sinks like JDBC only need the latter. @Joao, could you confirm? If that's the case, +1 to adding the following method signatures to InitContext: * TypeSerializer createInputSerializer() * boolean isObjectReuseEnabled() Thanks, Gordon On Fri, Apr 21, 2023 at 3:04 AM Zhu Zhu wrote: > Good point! @Gordon > Introducing an `InitContext#createSerializer(TypeInformation)` looks a > better option to me, so we do not need to introduce an unmodifiable > `ExecutionConfig` at this moment. > > Hope that we can make `ExecutionConfig` a read-only interface in > Flink 2.0. It is exposed in `RuntimeContext` to user functions already, > while mutating the values at runtime is actually an undefined behavior. > > Thanks, > Zhu > > Tzu-Li (Gordon) Tai 于2023年4月18日周二 01:02写道: > > > > Hi, > > > > Sorry for chiming in late. > > > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig > > directly through Sink#InitContext is the right thing to do. > > > > 1. A lot of the read-only getter methods on ExecutionConfig are > irrelevant > > for sinks. Expanding the scope of the InitContext interface with so many > > irrelevant methods is probably going to make writing unit tests a pain. > > > > 2. There's actually a few getter methods on `InitContext` that have > > duplicate/redundant info for what ExecutionConfig exposes. For example, > > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber > > currently exist and it can be confusing if users find 2 sources of that > > information (either via the `InitContext` and via the wrapped > > `ExecutionConfig`). > > > > All in all, it feels like `Sink#InitContext` was introduced initially as > a > > means to selectively only expose certain information to sinks. > > > > It looks like right now, the only requirement is that some sinks require > 1) > > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it > > make sense to follow the original intent and only selectively expose > these? > > For 1), we can just add a new method to `InitContext` and forward the > > information from `ExecutionConfig` accessible at the operator level. > > For 2), would it make sense to create the serializer at the operator > level > > and then provide it through `InitContext`? > > > > Thanks, > > Gordon > > > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu wrote: > > > > > We can let the `InitContext` return `ExecutionConfig` in the interface. > > > However, a `ReadableExecutionConfig` implementation should be returned > > > so that exceptions will be thrown if users tries to modify the > > > `ExecutionConfig`. > > > > > > We can rework all the setters of `ExecutionConfig` to internally > invoke a > > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can > > > just override that method. But pay attention to a few exceptional > > > setters, i.e. those for globalJobParameters and serializers. > > > > > > We should also explicitly state in the documentation of > > > `InitContext #getExecutionConfig()`, that the returned > `ExecutionConfig` > > > is unmodifiable. > > > > > > Thanks, > > > Zhu > > > > > > João Boto 于2023年4月17日周一 16:51写道: > > > > > > > > Hi Zhu, > > > > > > > > Thanks for you time for reviewing this. > > > > > > > > Extending ´ExecutionConfig´ will allow to modify the values in the > > > config (this is what we want to prevent with Option2) > > > > > > > > To extend the ExecutionConfig is not simpler to do Option1 (expose > > > ExecutionConfig directly). > > > > > > > > Regards > > > > > > > > > > > > > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote: > > > > > Hi João, > > > > > > > > > > Thanks for creating this FLIP! > > > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2. > > > > > > > > > > Yet I think it's better to let the `ReadableExecutionConfig` extend > > > > > `ExecutionConfig`, because otherwise we have to introduce a new > method > > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The > new > > > > > method may require every `TypeInformation` to implement it, > including > > > > > Flink built-in ones and custom ones, otherwise exceptions will > happen. > > > > > That goal, however, is pretty hard to achieve. > > > > > > > > > > Thanks, > > > > > Zhu > > > > > > > > > > João Boto 于2023年2月28日周二 23:34写道: > > > > > > > > > > > > I have update the FLIP with the 2 options that we have > discussed.. > > > > > > > > > > > > Option 1: Expose ExecutionConfig directly on InitContext > > > > > > this have a minimal impact as we only have to expose the new > methods > > > > > > > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext > > > > > > with this option we have more impact as we need to add a new > method > > > to TypeInformation and change a
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Good point! @Gordon Introducing an `InitContext#createSerializer(TypeInformation)` looks a better option to me, so we do not need to introduce an unmodifiable `ExecutionConfig` at this moment. Hope that we can make `ExecutionConfig` a read-only interface in Flink 2.0. It is exposed in `RuntimeContext` to user functions already, while mutating the values at runtime is actually an undefined behavior. Thanks, Zhu Tzu-Li (Gordon) Tai 于2023年4月18日周二 01:02写道: > > Hi, > > Sorry for chiming in late. > > I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig > directly through Sink#InitContext is the right thing to do. > > 1. A lot of the read-only getter methods on ExecutionConfig are irrelevant > for sinks. Expanding the scope of the InitContext interface with so many > irrelevant methods is probably going to make writing unit tests a pain. > > 2. There's actually a few getter methods on `InitContext` that have > duplicate/redundant info for what ExecutionConfig exposes. For example, > InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber > currently exist and it can be confusing if users find 2 sources of that > information (either via the `InitContext` and via the wrapped > `ExecutionConfig`). > > All in all, it feels like `Sink#InitContext` was introduced initially as a > means to selectively only expose certain information to sinks. > > It looks like right now, the only requirement is that some sinks require 1) > isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it > make sense to follow the original intent and only selectively expose these? > For 1), we can just add a new method to `InitContext` and forward the > information from `ExecutionConfig` accessible at the operator level. > For 2), would it make sense to create the serializer at the operator level > and then provide it through `InitContext`? > > Thanks, > Gordon > > On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu wrote: > > > We can let the `InitContext` return `ExecutionConfig` in the interface. > > However, a `ReadableExecutionConfig` implementation should be returned > > so that exceptions will be thrown if users tries to modify the > > `ExecutionConfig`. > > > > We can rework all the setters of `ExecutionConfig` to internally invoke a > > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can > > just override that method. But pay attention to a few exceptional > > setters, i.e. those for globalJobParameters and serializers. > > > > We should also explicitly state in the documentation of > > `InitContext #getExecutionConfig()`, that the returned `ExecutionConfig` > > is unmodifiable. > > > > Thanks, > > Zhu > > > > João Boto 于2023年4月17日周一 16:51写道: > > > > > > Hi Zhu, > > > > > > Thanks for you time for reviewing this. > > > > > > Extending ´ExecutionConfig´ will allow to modify the values in the > > config (this is what we want to prevent with Option2) > > > > > > To extend the ExecutionConfig is not simpler to do Option1 (expose > > ExecutionConfig directly). > > > > > > Regards > > > > > > > > > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote: > > > > Hi João, > > > > > > > > Thanks for creating this FLIP! > > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2. > > > > > > > > Yet I think it's better to let the `ReadableExecutionConfig` extend > > > > `ExecutionConfig`, because otherwise we have to introduce a new method > > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new > > > > method may require every `TypeInformation` to implement it, including > > > > Flink built-in ones and custom ones, otherwise exceptions will happen. > > > > That goal, however, is pretty hard to achieve. > > > > > > > > Thanks, > > > > Zhu > > > > > > > > João Boto 于2023年2月28日周二 23:34写道: > > > > > > > > > > I have update the FLIP with the 2 options that we have discussed.. > > > > > > > > > > Option 1: Expose ExecutionConfig directly on InitContext > > > > > this have a minimal impact as we only have to expose the new methods > > > > > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext > > > > > with this option we have more impact as we need to add a new method > > to TypeInformation and change all implementations (current exists 72 > > implementations) > > > > > > > > > > Waiting for feedback or concerns about the two options > > > > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi, Sorry for chiming in late. I'm not so sure that exposing ExecutionConfig / ReadExecutionConfig directly through Sink#InitContext is the right thing to do. 1. A lot of the read-only getter methods on ExecutionConfig are irrelevant for sinks. Expanding the scope of the InitContext interface with so many irrelevant methods is probably going to make writing unit tests a pain. 2. There's actually a few getter methods on `InitContext` that have duplicate/redundant info for what ExecutionConfig exposes. For example, InitContext#getNumberOfParallelSubtasks and InitContext#getAttemptNumber currently exist and it can be confusing if users find 2 sources of that information (either via the `InitContext` and via the wrapped `ExecutionConfig`). All in all, it feels like `Sink#InitContext` was introduced initially as a means to selectively only expose certain information to sinks. It looks like right now, the only requirement is that some sinks require 1) isObjectReuseEnabled, and 2) TypeSerializer for the input type. Would it make sense to follow the original intent and only selectively expose these? For 1), we can just add a new method to `InitContext` and forward the information from `ExecutionConfig` accessible at the operator level. For 2), would it make sense to create the serializer at the operator level and then provide it through `InitContext`? Thanks, Gordon On Mon, Apr 17, 2023 at 8:23 AM Zhu Zhu wrote: > We can let the `InitContext` return `ExecutionConfig` in the interface. > However, a `ReadableExecutionConfig` implementation should be returned > so that exceptions will be thrown if users tries to modify the > `ExecutionConfig`. > > We can rework all the setters of `ExecutionConfig` to internally invoke a > `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can > just override that method. But pay attention to a few exceptional > setters, i.e. those for globalJobParameters and serializers. > > We should also explicitly state in the documentation of > `InitContext #getExecutionConfig()`, that the returned `ExecutionConfig` > is unmodifiable. > > Thanks, > Zhu > > João Boto 于2023年4月17日周一 16:51写道: > > > > Hi Zhu, > > > > Thanks for you time for reviewing this. > > > > Extending ´ExecutionConfig´ will allow to modify the values in the > config (this is what we want to prevent with Option2) > > > > To extend the ExecutionConfig is not simpler to do Option1 (expose > ExecutionConfig directly). > > > > Regards > > > > > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote: > > > Hi João, > > > > > > Thanks for creating this FLIP! > > > I'm overall +1 for it to unblock the migration of sinks to SinkV2. > > > > > > Yet I think it's better to let the `ReadableExecutionConfig` extend > > > `ExecutionConfig`, because otherwise we have to introduce a new method > > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new > > > method may require every `TypeInformation` to implement it, including > > > Flink built-in ones and custom ones, otherwise exceptions will happen. > > > That goal, however, is pretty hard to achieve. > > > > > > Thanks, > > > Zhu > > > > > > João Boto 于2023年2月28日周二 23:34写道: > > > > > > > > I have update the FLIP with the 2 options that we have discussed.. > > > > > > > > Option 1: Expose ExecutionConfig directly on InitContext > > > > this have a minimal impact as we only have to expose the new methods > > > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext > > > > with this option we have more impact as we need to add a new method > to TypeInformation and change all implementations (current exists 72 > implementations) > > > > > > > > Waiting for feedback or concerns about the two options > > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
We can let the `InitContext` return `ExecutionConfig` in the interface. However, a `ReadableExecutionConfig` implementation should be returned so that exceptions will be thrown if users tries to modify the `ExecutionConfig`. We can rework all the setters of `ExecutionConfig` to internally invoke a `setConfiguration(...)` method. Then the `ReadableExecutionConfig` can just override that method. But pay attention to a few exceptional setters, i.e. those for globalJobParameters and serializers. We should also explicitly state in the documentation of `InitContext #getExecutionConfig()`, that the returned `ExecutionConfig` is unmodifiable. Thanks, Zhu João Boto 于2023年4月17日周一 16:51写道: > > Hi Zhu, > > Thanks for you time for reviewing this. > > Extending ´ExecutionConfig´ will allow to modify the values in the config > (this is what we want to prevent with Option2) > > To extend the ExecutionConfig is not simpler to do Option1 (expose > ExecutionConfig directly). > > Regards > > > > On 2023/04/03 09:42:28 Zhu Zhu wrote: > > Hi João, > > > > Thanks for creating this FLIP! > > I'm overall +1 for it to unblock the migration of sinks to SinkV2. > > > > Yet I think it's better to let the `ReadableExecutionConfig` extend > > `ExecutionConfig`, because otherwise we have to introduce a new method > > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new > > method may require every `TypeInformation` to implement it, including > > Flink built-in ones and custom ones, otherwise exceptions will happen. > > That goal, however, is pretty hard to achieve. > > > > Thanks, > > Zhu > > > > João Boto 于2023年2月28日周二 23:34写道: > > > > > > I have update the FLIP with the 2 options that we have discussed.. > > > > > > Option 1: Expose ExecutionConfig directly on InitContext > > > this have a minimal impact as we only have to expose the new methods > > > > > > Option 2: Expose ReadableExecutionConfig on InitContext > > > with this option we have more impact as we need to add a new method to > > > TypeInformation and change all implementations (current exists 72 > > > implementations) > > > > > > Waiting for feedback or concerns about the two options > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Zhu, Thanks for you time for reviewing this. Extending ´ExecutionConfig´ will allow to modify the values in the config (this is what we want to prevent with Option2) To extend the ExecutionConfig is not simpler to do Option1 (expose ExecutionConfig directly). Regards On 2023/04/03 09:42:28 Zhu Zhu wrote: > Hi João, > > Thanks for creating this FLIP! > I'm overall +1 for it to unblock the migration of sinks to SinkV2. > > Yet I think it's better to let the `ReadableExecutionConfig` extend > `ExecutionConfig`, because otherwise we have to introduce a new method > `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new > method may require every `TypeInformation` to implement it, including > Flink built-in ones and custom ones, otherwise exceptions will happen. > That goal, however, is pretty hard to achieve. > > Thanks, > Zhu > > João Boto 于2023年2月28日周二 23:34写道: > > > > I have update the FLIP with the 2 options that we have discussed.. > > > > Option 1: Expose ExecutionConfig directly on InitContext > > this have a minimal impact as we only have to expose the new methods > > > > Option 2: Expose ReadableExecutionConfig on InitContext > > with this option we have more impact as we need to add a new method to > > TypeInformation and change all implementations (current exists 72 > > implementations) > > > > Waiting for feedback or concerns about the two options >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi João, Thanks for creating this FLIP! I'm overall +1 for it to unblock the migration of sinks to SinkV2. Yet I think it's better to let the `ReadableExecutionConfig` extend `ExecutionConfig`, because otherwise we have to introduce a new method `TypeInformation#createSerializer(ReadableExecutionConfig)`. The new method may require every `TypeInformation` to implement it, including Flink built-in ones and custom ones, otherwise exceptions will happen. That goal, however, is pretty hard to achieve. Thanks, Zhu João Boto 于2023年2月28日周二 23:34写道: > > I have update the FLIP with the 2 options that we have discussed.. > > Option 1: Expose ExecutionConfig directly on InitContext > this have a minimal impact as we only have to expose the new methods > > Option 2: Expose ReadableExecutionConfig on InitContext > with this option we have more impact as we need to add a new method to > TypeInformation and change all implementations (current exists 72 > implementations) > > Waiting for feedback or concerns about the two options
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
I have update the FLIP with the 2 options that we have discussed.. Option 1: Expose ExecutionConfig directly on InitContext this have a minimal impact as we only have to expose the new methods Option 2: Expose ReadableExecutionConfig on InitContext with this option we have more impact as we need to add a new method to TypeInformation and change all implementations (current exists 72 implementations) Waiting for feedback or concerns about the two options
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Konstantin, I checked the usage of ExecutionConfig in Kinesis and KafKa sinks: - Kinesis sink: ExecutionConfig is not used by Kinesis sink. The one that uses getAutoWatermarkInterval is the Kinesis Source, Joao may have made a mistake. - Kafka sink: isObjectReuseEnabled and ExecutionConfig are used in upsert kafka table sink. The upsert kafka table sink obtains the ExecutionConfig through DataStreamSinkProvider, however, this way cannot be used for datastream sink and other SinkRuntimeProviders. Besides, I know that all the jdbc sinks(DataStream/Table) need the isObjectReuseEnabled and ExecutionConfig. The jdbc sink will buffer the records received and only flush them out when the buffer is full or a periodic timer is triggered or a checkpoint happens. The jdbc sinks will decide whether to buffer the copies or the original records based on isObjectReuseEnabled, when the object reuse is enabled, we should buffer the copies(because the content of the objects may be changed before flush), otherwise we should buffer the original records. And it needs the ExecutionConfig to create TypeSerializer to copy the records. Actaully, the upsert kafka table sink is similar to jdbc sink, I think all the sinks that with the "buffer records" behavior needs the isObjectReuseEnabled and ExecutionConfig. Best, Lijie Konstantin Knauf 于2023年2月4日周六 01:41写道: > Hi everyone, > > if I am not mistaken of the sinks mentioned by Joao Kafka, Kinesis & > Kinesis already use the Sink2 API. How were those implemented without > exposing the ExecutionConfig? > > Best, > > Konstantin > > > Am Mi., 1. Feb. 2023 um 12:28 Uhr schrieb Lijie Wang < > wangdachui9...@gmail.com>: > > > +1 for Option 2, if we can abstract an "ReadableExecutionConfig" > > interface(contains all is/get mehtod), and let ExecutionConfig implements > > ReadableExecutionConfig > > > > Best, > > Lijie > > > > João Boto 于2023年1月17日周二 20:39写道: > > > > > Hi all, > > > > > > As establish a read-only contract seems to be consensual approach, > > talking > > > to Lijie we saw two ways for doing this.. > > > > > > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig > (just > > > like the UnmodifiableConfiguration) > > > Pros: > > > - we have all the get methods > > > - don't need to change TypeInformation.createSerializer(ExecutionConfig > > > config) > > > Cons: > > > - we have to override 34 methods that modify things.. > > > - new methods to ExecutionConfig will need to be override on > > > UnmodifiableExecutionConfig > > > > > > > > > Option 2: UnmodifiableExecutionConfig without extending > ExecutionConfig. > > > Pros: > > > - new class so we don't need to override nothing. > > > - modifications to ExecutionConfig don't affect this class > > > Cons: > > > - need to change TypeInformation adding > > > createSerializer(UnmodifiableExecutionConfig config) > > > - need to add all get methods or only what needed (this could be a > pros) > > > > > > > > > What option you think is better? > > > > > > > > > > > > On 2023/01/13 14:15:04 Joao Boto wrote: > > > > Hi flink devs, > > > > > > > > I'd like to start a discussion thread for FLIP-287[1]. > > > > This comes from an offline discussion with @Lijie Wang, from > > FLIP-239[2] > > > > specially for the sink[3]. > > > > > > > > Basically to expose the ExecutionConfig and JobId on > > SinkV2#InitContext. > > > > This changes are necessary to correct migrate the current sinks to > > > SinkV2 > > > > like JdbcSink, KafkaTableSink and so on, that relies on > RuntimeContext > > > > > > > > Comments are welcome! > > > > Thanks, > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > > > [2] > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > > > [3] https://issues.apache.org/jira/browse/FLINK-25421 > > > > > > > > > > > > -- > https://twitter.com/snntrable > https://github.com/knaufk >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi everyone, if I am not mistaken of the sinks mentioned by Joao Kafka, Kinesis & Kinesis already use the Sink2 API. How were those implemented without exposing the ExecutionConfig? Best, Konstantin Am Mi., 1. Feb. 2023 um 12:28 Uhr schrieb Lijie Wang < wangdachui9...@gmail.com>: > +1 for Option 2, if we can abstract an "ReadableExecutionConfig" > interface(contains all is/get mehtod), and let ExecutionConfig implements > ReadableExecutionConfig > > Best, > Lijie > > João Boto 于2023年1月17日周二 20:39写道: > > > Hi all, > > > > As establish a read-only contract seems to be consensual approach, > talking > > to Lijie we saw two ways for doing this.. > > > > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just > > like the UnmodifiableConfiguration) > > Pros: > > - we have all the get methods > > - don't need to change TypeInformation.createSerializer(ExecutionConfig > > config) > > Cons: > > - we have to override 34 methods that modify things.. > > - new methods to ExecutionConfig will need to be override on > > UnmodifiableExecutionConfig > > > > > > Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig. > > Pros: > > - new class so we don't need to override nothing. > > - modifications to ExecutionConfig don't affect this class > > Cons: > > - need to change TypeInformation adding > > createSerializer(UnmodifiableExecutionConfig config) > > - need to add all get methods or only what needed (this could be a pros) > > > > > > What option you think is better? > > > > > > > > On 2023/01/13 14:15:04 Joao Boto wrote: > > > Hi flink devs, > > > > > > I'd like to start a discussion thread for FLIP-287[1]. > > > This comes from an offline discussion with @Lijie Wang, from > FLIP-239[2] > > > specially for the sink[3]. > > > > > > Basically to expose the ExecutionConfig and JobId on > SinkV2#InitContext. > > > This changes are necessary to correct migrate the current sinks to > > SinkV2 > > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > > > > > Comments are welcome! > > > Thanks, > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > > [2] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > > [3] https://issues.apache.org/jira/browse/FLINK-25421 > > > > > > -- https://twitter.com/snntrable https://github.com/knaufk
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
+1 for Option 2, if we can abstract an "ReadableExecutionConfig" interface(contains all is/get mehtod), and let ExecutionConfig implements ReadableExecutionConfig Best, Lijie João Boto 于2023年1月17日周二 20:39写道: > Hi all, > > As establish a read-only contract seems to be consensual approach, talking > to Lijie we saw two ways for doing this.. > > Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just > like the UnmodifiableConfiguration) > Pros: > - we have all the get methods > - don't need to change TypeInformation.createSerializer(ExecutionConfig > config) > Cons: > - we have to override 34 methods that modify things.. > - new methods to ExecutionConfig will need to be override on > UnmodifiableExecutionConfig > > > Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig. > Pros: > - new class so we don't need to override nothing. > - modifications to ExecutionConfig don't affect this class > Cons: > - need to change TypeInformation adding > createSerializer(UnmodifiableExecutionConfig config) > - need to add all get methods or only what needed (this could be a pros) > > > What option you think is better? > > > > On 2023/01/13 14:15:04 Joao Boto wrote: > > Hi flink devs, > > > > I'd like to start a discussion thread for FLIP-287[1]. > > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] > > specially for the sink[3]. > > > > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. > > This changes are necessary to correct migrate the current sinks to > SinkV2 > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > > > Comments are welcome! > > Thanks, > > > > [1] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > [2] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > [3] https://issues.apache.org/jira/browse/FLINK-25421 > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi all, As establish a read-only contract seems to be consensual approach, talking to Lijie we saw two ways for doing this.. Option 1: UnmodifiableExecutionConfig that extends ExecutionConfig (just like the UnmodifiableConfiguration) Pros: - we have all the get methods - don't need to change TypeInformation.createSerializer(ExecutionConfig config) Cons: - we have to override 34 methods that modify things.. - new methods to ExecutionConfig will need to be override on UnmodifiableExecutionConfig Option 2: UnmodifiableExecutionConfig without extending ExecutionConfig. Pros: - new class so we don't need to override nothing. - modifications to ExecutionConfig don't affect this class Cons: - need to change TypeInformation adding createSerializer(UnmodifiableExecutionConfig config) - need to add all get methods or only what needed (this could be a pros) What option you think is better? On 2023/01/13 14:15:04 Joao Boto wrote: > Hi flink devs, > > I'd like to start a discussion thread for FLIP-287[1]. > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] > specially for the sink[3]. > > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. > This changes are necessary to correct migrate the current sinks to SinkV2 > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > Comments are welcome! > Thanks, > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > [2] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > [3] https://issues.apache.org/jira/browse/FLINK-25421 >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Joao, Thanks for driving this FLIP, +1 for exposing a read-only ExecutionConfig, users/developers are not expected to modify the ExecutionConfig in Sink. Can we directly introduce the read-only ExecutionConfig in this FLIP? No need for a separate FLIP, because currently it will only be used in InitContext and will not affect other user interfaces. WDYT? cc @Gunnar Best, Lijie João Boto 于2023年1月16日周一 19:22写道: > Hi Jing Ge, > Thanks for your response.. > > Making the review left above about all connectors, I realise that we need > the full ExecutionConfig as it is needed to generate the serializer > correctly if objectReuse is enabled as we call > TypeInformation.createSerializer(ExecutionConfig config) > > On the PoC we use the ExecutionConfig and I don't see this... :( > > Regards > > On 2023/01/14 00:01:52 Jing Ge wrote: > > Hi Joao, > > > > Thanks for bringing this up. Exposing internal domain instances depends > on > > your requirements. Technically, it is even possible to expose the > > RuntimeContext [1] (must be considered very carefully). Since you > mentioned > > that you only need to know if objectReuse is enabled, how about just > expose > > isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to > > shrink the scope as small as possible to satisfy the requirement. If more > > information from ExecutionConfig is needed later, we still can refactor > the > > code properly according to the strong motivation. > > > > Best regards, > > Jing > > > > [1] > > > https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279 > > > > On Fri, Jan 13, 2023 at 6:19 PM João Boto wrote: > > > > > Hi Gunnar, > > > Thanks for your time and response... > > > > > > I think the problem you want to solve is the exposure of the > > > ExecutionConfig (that can be mutated) no? > > > The configuration is not mutated, we only need to know if objectReuse > is > > > enable. > > > This is already expose on RuntimeContext we think to keep it similar > to it > > > to simplify any migrations, but as said, for this migration from > > > ExecutionConfig we only need the isObjectReuseEnabled, and we could > expose > > > only this configuration.. > > > > > > Best regards, > > > > > > > > > On 2023/01/13 15:50:09 Gunnar Morling wrote: > > > > Hey Joao, > > > > > > > > Thanks for this FLIP! One question on the proposed interface changes: > > > > is it expected that the configuration is *mutated* via the > InitContext > > > > passed to Sink::createWriter()? If that's not the case, how about > > > > establishing a read-only contract representing the current > > > > configuration and passing in that one instead? That would probably > > > > deserve its own FLIP upon which yours here then would depend. Later > > > > on, other contracts which effectively shouldn't modify a config could > > > > use that one, too. > > > > > > > > Note I don't mean to stall your efforts here, but I thought it'd be a > > > > good idea to bring it up and gauge the general interest in this. > > > > > > > > Best, > > > > > > > > --Gunnar > > > > > > > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto >: > > > > > > > > > > Hi flink devs, > > > > > > > > > > I'd like to start a discussion thread for FLIP-287[1]. > > > > > This comes from an offline discussion with @Lijie Wang, from > > > FLIP-239[2] > > > > > specially for the sink[3]. > > > > > > > > > > Basically to expose the ExecutionConfig and JobId on > > > SinkV2#InitContext. > > > > > This changes are necessary to correct migrate the current sinks to > > > SinkV2 > > > > > like JdbcSink, KafkaTableSink and so on, that relies on > RuntimeContext > > > > > > > > > > Comments are welcome! > > > > > Thanks, > > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > > > > [2] > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > > > > [3] https://issues.apache.org/jira/browse/FLINK-25421 > > > > > > > > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Jing Ge, Thanks for your response.. Making the review left above about all connectors, I realise that we need the full ExecutionConfig as it is needed to generate the serializer correctly if objectReuse is enabled as we call TypeInformation.createSerializer(ExecutionConfig config) On the PoC we use the ExecutionConfig and I don't see this... :( Regards On 2023/01/14 00:01:52 Jing Ge wrote: > Hi Joao, > > Thanks for bringing this up. Exposing internal domain instances depends on > your requirements. Technically, it is even possible to expose the > RuntimeContext [1] (must be considered very carefully). Since you mentioned > that you only need to know if objectReuse is enabled, how about just expose > isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to > shrink the scope as small as possible to satisfy the requirement. If more > information from ExecutionConfig is needed later, we still can refactor the > code properly according to the strong motivation. > > Best regards, > Jing > > [1] > https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279 > > On Fri, Jan 13, 2023 at 6:19 PM João Boto wrote: > > > Hi Gunnar, > > Thanks for your time and response... > > > > I think the problem you want to solve is the exposure of the > > ExecutionConfig (that can be mutated) no? > > The configuration is not mutated, we only need to know if objectReuse is > > enable. > > This is already expose on RuntimeContext we think to keep it similar to it > > to simplify any migrations, but as said, for this migration from > > ExecutionConfig we only need the isObjectReuseEnabled, and we could expose > > only this configuration.. > > > > Best regards, > > > > > > On 2023/01/13 15:50:09 Gunnar Morling wrote: > > > Hey Joao, > > > > > > Thanks for this FLIP! One question on the proposed interface changes: > > > is it expected that the configuration is *mutated* via the InitContext > > > passed to Sink::createWriter()? If that's not the case, how about > > > establishing a read-only contract representing the current > > > configuration and passing in that one instead? That would probably > > > deserve its own FLIP upon which yours here then would depend. Later > > > on, other contracts which effectively shouldn't modify a config could > > > use that one, too. > > > > > > Note I don't mean to stall your efforts here, but I thought it'd be a > > > good idea to bring it up and gauge the general interest in this. > > > > > > Best, > > > > > > --Gunnar > > > > > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto : > > > > > > > > Hi flink devs, > > > > > > > > I'd like to start a discussion thread for FLIP-287[1]. > > > > This comes from an offline discussion with @Lijie Wang, from > > FLIP-239[2] > > > > specially for the sink[3]. > > > > > > > > Basically to expose the ExecutionConfig and JobId on > > SinkV2#InitContext. > > > > This changes are necessary to correct migrate the current sinks to > > SinkV2 > > > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > > > > > > > Comments are welcome! > > > > Thanks, > > > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > > > [2] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > > > [3] https://issues.apache.org/jira/browse/FLINK-25421 > > > > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi all, After an offline talk with Gunnar I review all connectors on branch release_1.16 (still has virtually all connectors) we can see some of them using ExecutionConfig.. The basic use is for creating serializers: TypeInformation.createSerializer(ExecutionConfig config), and since it is a Flink-core class, I think the easy way is to expose the full ExecutionConfig.. As note on some of them the config is not used on implementations.. For a rapid view (ignoring the use of ExecutionConfig.ClosureCleanerLevel as its a enum), - Cassandra connector: - uses TypeInformation.createSerializer (but builds a new ExecutionConfig for it) - Files connector: - uses TypeInformation.createSerializer - creates KyroSerializer that use: getDefaultKryoSerializers, getDefaultKryoSerializerClasses, getRegisteredKryoTypes, getRegisteredTypesWithKryoSerializerClasses, getRegisteredTypesWithKryoSerializers - Hive connector: - uses TypeInformation.createSerializer (but builds a new ExecutionConfig for it) - Jdbc connector: - uses TypeInformation.createSerializer - uses isObjectReuseEnable - Kafka connector: - uses TypeInformation.createSerializer - uses getAutoWatermarkInterval - creates KyroSerializer - Kinesis connector: - uses getAutoWatermarkInterval - Pulsar connector: - uses TypeInformation.createSerializer We can create a FLIP to discuss a new ReadOnlyConfig but this will lead us to discuss lot of new things. Best regards On 2023/01/13 14:15:04 Joao Boto wrote: > Hi flink devs, > > I'd like to start a discussion thread for FLIP-287[1]. > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] > specially for the sink[3]. > > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. > This changes are necessary to correct migrate the current sinks to SinkV2 > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > Comments are welcome! > Thanks, > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > [2] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > [3] https://issues.apache.org/jira/browse/FLINK-25421 >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Joao, Thanks for bringing this up. Exposing internal domain instances depends on your requirements. Technically, it is even possible to expose the RuntimeContext [1] (must be considered very carefully). Since you mentioned that you only need to know if objectReuse is enabled, how about just expose isObjectReuseEnabled instead of the whole ExecutionConfig? The idea is to shrink the scope as small as possible to satisfy the requirement. If more information from ExecutionConfig is needed later, we still can refactor the code properly according to the strong motivation. Best regards, Jing [1] https://github.com/apache/flink/blob/560b4612735a2b9cd3b5db88adf5cb223e85535b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L279 On Fri, Jan 13, 2023 at 6:19 PM João Boto wrote: > Hi Gunnar, > Thanks for your time and response... > > I think the problem you want to solve is the exposure of the > ExecutionConfig (that can be mutated) no? > The configuration is not mutated, we only need to know if objectReuse is > enable. > This is already expose on RuntimeContext we think to keep it similar to it > to simplify any migrations, but as said, for this migration from > ExecutionConfig we only need the isObjectReuseEnabled, and we could expose > only this configuration.. > > Best regards, > > > On 2023/01/13 15:50:09 Gunnar Morling wrote: > > Hey Joao, > > > > Thanks for this FLIP! One question on the proposed interface changes: > > is it expected that the configuration is *mutated* via the InitContext > > passed to Sink::createWriter()? If that's not the case, how about > > establishing a read-only contract representing the current > > configuration and passing in that one instead? That would probably > > deserve its own FLIP upon which yours here then would depend. Later > > on, other contracts which effectively shouldn't modify a config could > > use that one, too. > > > > Note I don't mean to stall your efforts here, but I thought it'd be a > > good idea to bring it up and gauge the general interest in this. > > > > Best, > > > > --Gunnar > > > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto : > > > > > > Hi flink devs, > > > > > > I'd like to start a discussion thread for FLIP-287[1]. > > > This comes from an offline discussion with @Lijie Wang, from > FLIP-239[2] > > > specially for the sink[3]. > > > > > > Basically to expose the ExecutionConfig and JobId on > SinkV2#InitContext. > > > This changes are necessary to correct migrate the current sinks to > SinkV2 > > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > > > > > Comments are welcome! > > > Thanks, > > > > > > [1] > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > > [2] > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > > [3] https://issues.apache.org/jira/browse/FLINK-25421 > > >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Gunnar, Thanks for your time and response... I think the problem you want to solve is the exposure of the ExecutionConfig (that can be mutated) no? The configuration is not mutated, we only need to know if objectReuse is enable. This is already expose on RuntimeContext we think to keep it similar to it to simplify any migrations, but as said, for this migration from ExecutionConfig we only need the isObjectReuseEnabled, and we could expose only this configuration.. Best regards, On 2023/01/13 15:50:09 Gunnar Morling wrote: > Hey Joao, > > Thanks for this FLIP! One question on the proposed interface changes: > is it expected that the configuration is *mutated* via the InitContext > passed to Sink::createWriter()? If that's not the case, how about > establishing a read-only contract representing the current > configuration and passing in that one instead? That would probably > deserve its own FLIP upon which yours here then would depend. Later > on, other contracts which effectively shouldn't modify a config could > use that one, too. > > Note I don't mean to stall your efforts here, but I thought it'd be a > good idea to bring it up and gauge the general interest in this. > > Best, > > --Gunnar > > Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto : > > > > Hi flink devs, > > > > I'd like to start a discussion thread for FLIP-287[1]. > > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] > > specially for the sink[3]. > > > > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. > > This changes are necessary to correct migrate the current sinks to SinkV2 > > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > > > Comments are welcome! > > Thanks, > > > > [1] > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > > [2] > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > > [3] https://issues.apache.org/jira/browse/FLINK-25421 >
Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hey Joao, Thanks for this FLIP! One question on the proposed interface changes: is it expected that the configuration is *mutated* via the InitContext passed to Sink::createWriter()? If that's not the case, how about establishing a read-only contract representing the current configuration and passing in that one instead? That would probably deserve its own FLIP upon which yours here then would depend. Later on, other contracts which effectively shouldn't modify a config could use that one, too. Note I don't mean to stall your efforts here, but I thought it'd be a good idea to bring it up and gauge the general interest in this. Best, --Gunnar Am Fr., 13. Jan. 2023 um 15:17 Uhr schrieb Joao Boto : > > Hi flink devs, > > I'd like to start a discussion thread for FLIP-287[1]. > This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] > specially for the sink[3]. > > Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. > This changes are necessary to correct migrate the current sinks to SinkV2 > like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext > > Comments are welcome! > Thanks, > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 > [2] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 > [3] https://issues.apache.org/jira/browse/FLINK-25421
[DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi flink devs, I'd like to start a discussion thread for FLIP-287[1]. This comes from an offline discussion with @Lijie Wang, from FLIP-239[2] specially for the sink[3]. Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext. This changes are necessary to correct migrate the current sinks to SinkV2 like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext Comments are welcome! Thanks, [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271 [3] https://issues.apache.org/jira/browse/FLINK-25421