[jira] [Commented] (FLINK-34869) [Bug][mysql] Remove all previous table and add new added table will throw Exception.

2024-04-23 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840201#comment-17840201
 ] 

Josh Mahonin commented on FLINK-34869:
--

We seem to be hitting this issue as well. Is this issue possibly fixed by 
another commit, or is it still outstanding [~loserwang1024] ?

If the issue is still outstanding, with some guidance we could attempt a patch. 
Thanks.

> [Bug][mysql] Remove all previous table and add new added table will throw 
> Exception.
> 
>
> Key: FLINK-34869
> URL: https://issues.apache.org/jira/browse/FLINK-34869
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Flink CDC Issue Import
>Priority: Major
>  Labels: github-import
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
> nothing similar.
> ### Flink version
> 1.18
> ### Flink CDC version
> 3.0.1
> ### Database and its version
> anyone 
> ### Minimal reproduce step
> 1. Stop job in savepoint.
> 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList 
> with tables which not includes in last time.
> 3. Then assign status will be chaos.
> Take a test case for example:
> ```java
> public class NewlyAddedTableITCase extends MySqlSourceTestBase {
> @Test
> public void testRemoveAndAddTablesOneByOne() throws Exception {
> testRemoveAndAddTablesOneByOne(
> 1, "address_hangzhou", "address_beijing", "address_shanghai");
> }
> private void testRemoveAndAddTablesOneByOne(int parallelism, String... 
> captureAddressTables)
> throws Exception {
> MySqlConnection connection = getConnection();
> // step 1: create mysql tables with all tables included
> initialAddressTables(connection, captureAddressTables);
> final TemporaryFolder temporaryFolder = new TemporaryFolder();
> temporaryFolder.create();
> final String savepointDirectory = 
> temporaryFolder.newFolder().toURI().toString();
> // get all expected data
> List fetchedDataList = new ArrayList<>();
> String finishedSavePointPath = null;
> // test removing and adding table one by one
> for (int round = 0; round < captureAddressTables.length; round++] {
> String captureTableThisRound = captureAddressTables[round];
> String cityName = captureTableThisRound.split("_")[1];
> StreamExecutionEnvironment env =
> getStreamExecutionEnvironment(finishedSavePointPath, 
> parallelism);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> String createTableStatement =
> getCreateTableStatement(new HashMap<>(), 
> captureTableThisRound);
> tEnv.executeSql(createTableStatement);
> tEnv.executeSql(
> "CREATE TABLE sink ("
> + " table_name STRING,"
> + " id BIGINT,"
> + " country STRING,"
> + " city STRING,"
> + " detail_address STRING,"
> + " primary key (table_name,id) not enforced"
> + ") WITH ("
> + " 'connector' = 'values',"
> + " 'sink-insert-only' = 'false'"
> + ")");
> TableResult tableResult = tEnv.executeSql("insert into sink 
> select * from address");
> JobClient jobClient = tableResult.getJobClient().get();
> // this round's snapshot data
> fetchedDataList.addAll(
> Arrays.asList(
> format(
> "+I[%s, 416874195632735147, China, %s, %s 
> West Town address 1]",
> captureTableThisRound, cityName, 
> cityName),
> format(
> "+I[%s, 416927583791428523, China, %s, %s 
> West Town address 2]",
> captureTableThisRound, cityName, 
> cityName),
> format(
> "+I[%s, 417022095255614379, China, %s, %s 
> West Town address 3]",
> captureTableThisRound, cityName, 
> cityName)));
> waitForSinkSize("sink", fetchedDataList.size());
> assertEqualsInAnyOrder(fetchedDataList, 
> TestValuesTableFactory.getRawResults("sink"));
> // only this round table's data is captured.
> // step 3: make binlog data for all tables before this 

[jira] [Commented] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID

2022-10-27 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625038#comment-17625038
 ] 

Josh Mahonin commented on FLINK-28697:
--

Agreed, the use of mismatched JVM versions is not ideal. In this case the major 
versions were the same (16), but due to some ideosyncrasies in container 
packaging, they were not the exact same JVM version.

The other classes in that package all have a serialVersionUID set (last I 
checked).

I’ll also note the Flink style guide says all serializable classes must declare 
a serialVersionUID
[https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization]

> MapDataSerializer doesn't declare a serialVersionUID
> 
>
> Key: FLINK-28697
> URL: https://issues.apache.org/jira/browse/FLINK-28697
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.1
>Reporter: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
>
> MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a 
> InvalidClassException when attempting to serialize with different JREs for 
> compilation / runtime.
> {code:java}
> Caused by: java.io.InvalidClassException: 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class 
> incompatible: stream classdesc serialVersionUID = 2533002123505507000, local 
> class serialVersionUID = 1622156938509929811 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration

2022-08-18 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581388#comment-17581388
 ] 

Josh Mahonin commented on FLINK-19589:
--

{quote}Do you think your approach could be extended to cover all filesystems? 
What alternatives are there?
{quote}
In general, I think so. The {{FileSystemFactory}} interface simply accepts a 
URI, and this strategy uses (abuses) the fact that URIs can contain query 
parameters, so it's a convenient way to pass in dynamic options to a file 
system. The other Hadoop-based file systems seem to have similar initialization 
logic, so I suspect there is a way to make the approach a bit more global.

That said, I don't know if this is the most correct way to approach the 
problem, but it was the simplest option I found to do so.

 
{quote}Does you approach support different configurations within the same Job?
{quote}
I think so, if I'm interpreting the question correctly. The changes are applied 
in {{{}S3FileSystemFactory::getInitURI(){}}}, which is called by 
{{{}AbstractS3FileSystemFactory::create(){}}}. At this point the Hadoop FS and 
Configuration are being created on a per-URI basis, so if a Job has references 
to multiple FileSystems/URIs, each would have independent configuration.

I've seen there's also a {{FileSystem.CACHE}} though, which might complicate 
matters. The corresponding {{FSKey}} is based on the scheme and authority, e.g. 
{{s3://my_bucket/my_folder}} -> {{{}FSKey(s3, my_bucket){}}}. This may cause 
unexpected configuration collisions in a shared session cluster if different 
users have the same bucket name, although I believe that is also the case today.

 

 

 

 

 

 

> Support per-connector FileSystem configuration
> --
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Assignee: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
> Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28736) Add APPROX_PERCENTILE function

2022-07-29 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573004#comment-17573004
 ] 

Josh Mahonin commented on FLINK-28736:
--

That seems reasonable. I'll revisit this after Flink 1.16.

> Add APPROX_PERCENTILE function
> --
>
> Key: FLINK-28736
> URL: https://issues.apache.org/jira/browse/FLINK-28736
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.15.1
>Reporter: Josh Mahonin
>Priority: Minor
>
> We have an {{APPROX_PERCENTILE}} UDF that we believe may be useful to the 
> broader community. It's a rather simple implementation that wraps TDigest to 
> return approximate quantile/percentile data in both batch and streaming mode.
> I'm somewhat torn as to how to properly contribute this. Following Calcite 
> conventions, I believe this would qualify as an {{APPROXIMATE 
> PERCENTILE_DISC}} function, although that would require Calcite >= 1.28 
> (FLINK-21239, FLINK-27998). 
> Alternatively, perhaps this could simply be dropped in as a new function 
> that's not backed by Calcite, although it's not immediately clear to me how 
> to proceed with that, though I'm happy to take guidance here.
> This is a gist of the implementation:
> https://gist.github.com/jmahonin/d75150999af30bc78bdf00c7b0ecbd4f



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28736) Add APPROX_PERCENTILE function

2022-07-28 Thread Josh Mahonin (Jira)
Josh Mahonin created FLINK-28736:


 Summary: Add APPROX_PERCENTILE function
 Key: FLINK-28736
 URL: https://issues.apache.org/jira/browse/FLINK-28736
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.15.1
Reporter: Josh Mahonin


We have an {{APPROX_PERCENTILE}} UDF that we believe may be useful to the 
broader community. It's a rather simple implementation that wraps TDigest to 
return approximate quantile/percentile data in both batch and streaming mode.

I'm somewhat torn as to how to properly contribute this. Following Calcite 
conventions, I believe this would qualify as an {{APPROXIMATE PERCENTILE_DISC}} 
function, although that would require Calcite >= 1.28 (FLINK-21239, 
FLINK-27998). 

Alternatively, perhaps this could simply be dropped in as a new function that's 
not backed by Calcite, although it's not immediately clear to me how to proceed 
with that, though I'm happy to take guidance here.

This is a gist of the implementation:
https://gist.github.com/jmahonin/d75150999af30bc78bdf00c7b0ecbd4f



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID

2022-07-27 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571907#comment-17571907
 ] 

Josh Mahonin commented on FLINK-28697:
--

Hi [~yunta] 

Unfortunately the issue is a few months old, and as such the logs have expired 
from our aggregator.

We were using the same Flink version, but there was a mismatch in JVM versions 
between the compiled JAR, and what was being used at runtime. As well, I 
believe the JVM versions were different between client and server. I'm not 
certain exactly scenario caused the issue, but specifically setting the 
{{serialVersionUID}} resolved it. Note that it's the only class in that package 
without one defined.

> MapDataSerializer doesn't declare a serialVersionUID
> 
>
> Key: FLINK-28697
> URL: https://issues.apache.org/jira/browse/FLINK-28697
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.1
>Reporter: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
>
> MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a 
> InvalidClassException when attempting to serialize with different JREs for 
> compilation / runtime.
> {code:java}
> Caused by: java.io.InvalidClassException: 
> org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class 
> incompatible: stream classdesc serialVersionUID = 2533002123505507000, local 
> class serialVersionUID = 1622156938509929811 {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID

2022-07-26 Thread Josh Mahonin (Jira)
Josh Mahonin created FLINK-28697:


 Summary: MapDataSerializer doesn't declare a serialVersionUID
 Key: FLINK-28697
 URL: https://issues.apache.org/jira/browse/FLINK-28697
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.15.1
Reporter: Josh Mahonin
 Fix For: 1.15.1


MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a 
InvalidClassException when attempting to serialize with different JREs for 
compilation / runtime.
{code:java}
Caused by: java.io.InvalidClassException: 
org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class 
incompatible: stream classdesc serialVersionUID = 2533002123505507000, local 
class serialVersionUID = 1622156938509929811 {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration

2022-07-26 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571456#comment-17571456
 ] 

Josh Mahonin commented on FLINK-19589:
--

PR is open. Note that this only address dynamic configuration for the S3 
connector. I'm happy to retarget to another ticket if that's more appropriate, 
or if there are suggestions to generalize this more broadly, that's fine too.

> Support per-connector FileSystem configuration
> --
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Assignee: Josh Mahonin
>Priority: Major
>  Labels: pull-request-available
> Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration

2022-07-25 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571034#comment-17571034
 ] 

Josh Mahonin commented on FLINK-19589:
--

Sure thing [~martijnvisser]

> Support per-connector FileSystem configuration
> --
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Assignee: Josh Mahonin
>Priority: Major
> Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration

2022-07-25 Thread Josh Mahonin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570902#comment-17570902
 ] 

Josh Mahonin commented on FLINK-19589:
--

As a follow-up to the discussion on the list above, I've implemented a small 
change within the {{S3FileSystemFactory}} to allow passing parameters 
dynamically, rather than requiring they be present in the global configuration.

It does this by using query parameters on the URI object, and applies any of 
the {{fs.s3a.}} prefixed options to the Hadoop Configuration. In this way, one 
can construct a URI such as 
{{s3a://bucket/path?fs.s3a.assumed.role.arn=some_arn}} and subsequently have 
the underlying Hadoop-AWS library apply the ARN setting.

I'm happy to take any feedback on this approach, and whether it can be 
generalized to other FileSystems. If there is existing strategy in progress, 
I'm fine with assisting with that effort also.

[^FLINK-19589.patch]

> Support per-connector FileSystem configuration
> --
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Priority: Major
> Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-19589) Support per-connector FileSystem configuration

2022-07-25 Thread Josh Mahonin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Mahonin updated FLINK-19589:
-
Attachment: FLINK-19589.patch

> Support per-connector FileSystem configuration
> --
>
> Key: FLINK-19589
> URL: https://issues.apache.org/jira/browse/FLINK-19589
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.12.0
>Reporter: Padarn Wilson
>Priority: Major
> Attachments: FLINK-19589.patch
>
>
> Currently, options for file systems can only be configured globally. However, 
> in many cases, users would like to configure more fine-grained.
> Either we allow a properties map similar to Kafka or Kinesis properties to 
> our connectors.
> Or something like:
> Management of two properties related S3 Object management:
>  - [Lifecycle configuration 
> |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html]
>  - [Object 
> tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm]
> Being able to control these is useful for people who want to manage jobs 
> using S3 for checkpointing or job output, but need to control per job level 
> configuration of the tagging/lifecycle for the purposes of auditing or cost 
> control (for example deleting old state from S3)
> Ideally, it would be possible to control this on each object being written by 
> Flink, or at least at a job level.
> _Note_*:* Some related existing properties can be set using the hadoop module 
> using system properties: see for example 
> {code:java}
> fs.s3a.acl.default{code}
> which sets the default ACL on written objects.
> *Solutions*:
> 1) Modify hadoop module:
> The above-linked module could be updated in order to have a new property (and 
> similar for lifecycle)
>  fs.s3a.tags.default
>  which could be a comma separated list of tags to set. For example
> {code:java}
> fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code}
> This seems like a natural place to put this logic (and is outside of Flink if 
> we decide to go this way. However it does not allow for a sink and checkpoint 
> to have different values for these.
> 2) Expose withTagging from module
> The hadoop module used by Flink's existing filesystem has already exposed put 
> request level tagging (see 
> [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]).
>  This could be used in the Flink filesystem plugin to expose these options. A 
> possible approach could be to somehow incorporate it into the file path, e.g.,
> {code:java}
> path = "TAGS:s3://bucket/path"{code}
>  Or possible as an option that can be applied to the checkpoint and sink 
> configurations, e.g.,
> {code:java}
> env.getCheckpointingConfig().setS3Tags(TAGS) {code}
> and similar for a file sink.
> _Note_: The lifecycle can also be managed using the module: see 
> [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html].
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)