[jira] [Commented] (FLINK-34869) [Bug][mysql] Remove all previous table and add new added table will throw Exception.
[ https://issues.apache.org/jira/browse/FLINK-34869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840201#comment-17840201 ] Josh Mahonin commented on FLINK-34869: -- We seem to be hitting this issue as well. Is this issue possibly fixed by another commit, or is it still outstanding [~loserwang1024] ? If the issue is still outstanding, with some guidance we could attempt a patch. Thanks. > [Bug][mysql] Remove all previous table and add new added table will throw > Exception. > > > Key: FLINK-34869 > URL: https://issues.apache.org/jira/browse/FLINK-34869 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Flink CDC Issue Import >Priority: Major > Labels: github-import > > ### Search before asking > - [X] I searched in the > [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found > nothing similar. > ### Flink version > 1.18 > ### Flink CDC version > 3.0.1 > ### Database and its version > anyone > ### Minimal reproduce step > 1. Stop job in savepoint. > 2. Set 'scan.incremental.snapshot.enabled' = 'true' and then set tableList > with tables which not includes in last time. > 3. Then assign status will be chaos. > Take a test case for example: > ```java > public class NewlyAddedTableITCase extends MySqlSourceTestBase { > @Test > public void testRemoveAndAddTablesOneByOne() throws Exception { > testRemoveAndAddTablesOneByOne( > 1, "address_hangzhou", "address_beijing", "address_shanghai"); > } > private void testRemoveAndAddTablesOneByOne(int parallelism, String... > captureAddressTables) > throws Exception { > MySqlConnection connection = getConnection(); > // step 1: create mysql tables with all tables included > initialAddressTables(connection, captureAddressTables); > final TemporaryFolder temporaryFolder = new TemporaryFolder(); > temporaryFolder.create(); > final String savepointDirectory = > temporaryFolder.newFolder().toURI().toString(); > // get all expected data > List fetchedDataList = new ArrayList<>(); > String finishedSavePointPath = null; > // test removing and adding table one by one > for (int round = 0; round < captureAddressTables.length; round++] { > String captureTableThisRound = captureAddressTables[round]; > String cityName = captureTableThisRound.split("_")[1]; > StreamExecutionEnvironment env = > getStreamExecutionEnvironment(finishedSavePointPath, > parallelism); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > String createTableStatement = > getCreateTableStatement(new HashMap<>(), > captureTableThisRound); > tEnv.executeSql(createTableStatement); > tEnv.executeSql( > "CREATE TABLE sink (" > + " table_name STRING," > + " id BIGINT," > + " country STRING," > + " city STRING," > + " detail_address STRING," > + " primary key (table_name,id) not enforced" > + ") WITH (" > + " 'connector' = 'values'," > + " 'sink-insert-only' = 'false'" > + ")"); > TableResult tableResult = tEnv.executeSql("insert into sink > select * from address"); > JobClient jobClient = tableResult.getJobClient().get(); > // this round's snapshot data > fetchedDataList.addAll( > Arrays.asList( > format( > "+I[%s, 416874195632735147, China, %s, %s > West Town address 1]", > captureTableThisRound, cityName, > cityName), > format( > "+I[%s, 416927583791428523, China, %s, %s > West Town address 2]", > captureTableThisRound, cityName, > cityName), > format( > "+I[%s, 417022095255614379, China, %s, %s > West Town address 3]", > captureTableThisRound, cityName, > cityName))); > waitForSinkSize("sink", fetchedDataList.size()); > assertEqualsInAnyOrder(fetchedDataList, > TestValuesTableFactory.getRawResults("sink")); > // only this round table's data is captured. > // step 3: make binlog data for all tables before this
[jira] [Commented] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID
[ https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625038#comment-17625038 ] Josh Mahonin commented on FLINK-28697: -- Agreed, the use of mismatched JVM versions is not ideal. In this case the major versions were the same (16), but due to some ideosyncrasies in container packaging, they were not the exact same JVM version. The other classes in that package all have a serialVersionUID set (last I checked). I’ll also note the Flink style guide says all serializable classes must declare a serialVersionUID [https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization] > MapDataSerializer doesn't declare a serialVersionUID > > > Key: FLINK-28697 > URL: https://issues.apache.org/jira/browse/FLINK-28697 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.15.1 >Reporter: Josh Mahonin >Priority: Major > Labels: pull-request-available > > MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a > InvalidClassException when attempting to serialize with different JREs for > compilation / runtime. > {code:java} > Caused by: java.io.InvalidClassException: > org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class > incompatible: stream classdesc serialVersionUID = 2533002123505507000, local > class serialVersionUID = 1622156938509929811 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration
[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581388#comment-17581388 ] Josh Mahonin commented on FLINK-19589: -- {quote}Do you think your approach could be extended to cover all filesystems? What alternatives are there? {quote} In general, I think so. The {{FileSystemFactory}} interface simply accepts a URI, and this strategy uses (abuses) the fact that URIs can contain query parameters, so it's a convenient way to pass in dynamic options to a file system. The other Hadoop-based file systems seem to have similar initialization logic, so I suspect there is a way to make the approach a bit more global. That said, I don't know if this is the most correct way to approach the problem, but it was the simplest option I found to do so. {quote}Does you approach support different configurations within the same Job? {quote} I think so, if I'm interpreting the question correctly. The changes are applied in {{{}S3FileSystemFactory::getInitURI(){}}}, which is called by {{{}AbstractS3FileSystemFactory::create(){}}}. At this point the Hadoop FS and Configuration are being created on a per-URI basis, so if a Job has references to multiple FileSystems/URIs, each would have independent configuration. I've seen there's also a {{FileSystem.CACHE}} though, which might complicate matters. The corresponding {{FSKey}} is based on the scheme and authority, e.g. {{s3://my_bucket/my_folder}} -> {{{}FSKey(s3, my_bucket){}}}. This may cause unexpected configuration collisions in a shared session cluster if different users have the same bucket name, although I believe that is also the case today. > Support per-connector FileSystem configuration > -- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.12.0 >Reporter: Padarn Wilson >Assignee: Josh Mahonin >Priority: Major > Labels: pull-request-available > Attachments: FLINK-19589.patch > > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28736) Add APPROX_PERCENTILE function
[ https://issues.apache.org/jira/browse/FLINK-28736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17573004#comment-17573004 ] Josh Mahonin commented on FLINK-28736: -- That seems reasonable. I'll revisit this after Flink 1.16. > Add APPROX_PERCENTILE function > -- > > Key: FLINK-28736 > URL: https://issues.apache.org/jira/browse/FLINK-28736 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.15.1 >Reporter: Josh Mahonin >Priority: Minor > > We have an {{APPROX_PERCENTILE}} UDF that we believe may be useful to the > broader community. It's a rather simple implementation that wraps TDigest to > return approximate quantile/percentile data in both batch and streaming mode. > I'm somewhat torn as to how to properly contribute this. Following Calcite > conventions, I believe this would qualify as an {{APPROXIMATE > PERCENTILE_DISC}} function, although that would require Calcite >= 1.28 > (FLINK-21239, FLINK-27998). > Alternatively, perhaps this could simply be dropped in as a new function > that's not backed by Calcite, although it's not immediately clear to me how > to proceed with that, though I'm happy to take guidance here. > This is a gist of the implementation: > https://gist.github.com/jmahonin/d75150999af30bc78bdf00c7b0ecbd4f -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28736) Add APPROX_PERCENTILE function
Josh Mahonin created FLINK-28736: Summary: Add APPROX_PERCENTILE function Key: FLINK-28736 URL: https://issues.apache.org/jira/browse/FLINK-28736 Project: Flink Issue Type: Improvement Affects Versions: 1.15.1 Reporter: Josh Mahonin We have an {{APPROX_PERCENTILE}} UDF that we believe may be useful to the broader community. It's a rather simple implementation that wraps TDigest to return approximate quantile/percentile data in both batch and streaming mode. I'm somewhat torn as to how to properly contribute this. Following Calcite conventions, I believe this would qualify as an {{APPROXIMATE PERCENTILE_DISC}} function, although that would require Calcite >= 1.28 (FLINK-21239, FLINK-27998). Alternatively, perhaps this could simply be dropped in as a new function that's not backed by Calcite, although it's not immediately clear to me how to proceed with that, though I'm happy to take guidance here. This is a gist of the implementation: https://gist.github.com/jmahonin/d75150999af30bc78bdf00c7b0ecbd4f -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID
[ https://issues.apache.org/jira/browse/FLINK-28697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571907#comment-17571907 ] Josh Mahonin commented on FLINK-28697: -- Hi [~yunta] Unfortunately the issue is a few months old, and as such the logs have expired from our aggregator. We were using the same Flink version, but there was a mismatch in JVM versions between the compiled JAR, and what was being used at runtime. As well, I believe the JVM versions were different between client and server. I'm not certain exactly scenario caused the issue, but specifically setting the {{serialVersionUID}} resolved it. Note that it's the only class in that package without one defined. > MapDataSerializer doesn't declare a serialVersionUID > > > Key: FLINK-28697 > URL: https://issues.apache.org/jira/browse/FLINK-28697 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.15.1 >Reporter: Josh Mahonin >Priority: Major > Labels: pull-request-available > > MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a > InvalidClassException when attempting to serialize with different JREs for > compilation / runtime. > {code:java} > Caused by: java.io.InvalidClassException: > org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class > incompatible: stream classdesc serialVersionUID = 2533002123505507000, local > class serialVersionUID = 1622156938509929811 {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28697) MapDataSerializer doesn't declare a serialVersionUID
Josh Mahonin created FLINK-28697: Summary: MapDataSerializer doesn't declare a serialVersionUID Key: FLINK-28697 URL: https://issues.apache.org/jira/browse/FLINK-28697 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.15.1 Reporter: Josh Mahonin Fix For: 1.15.1 MapDataSerializer doesn't declare a serialVersionUID, which can manifest as a InvalidClassException when attempting to serialize with different JREs for compilation / runtime. {code:java} Caused by: java.io.InvalidClassException: org.apache.flink.table.runtime.typeutils.MapDataSerializer; local class incompatible: stream classdesc serialVersionUID = 2533002123505507000, local class serialVersionUID = 1622156938509929811 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration
[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571456#comment-17571456 ] Josh Mahonin commented on FLINK-19589: -- PR is open. Note that this only address dynamic configuration for the S3 connector. I'm happy to retarget to another ticket if that's more appropriate, or if there are suggestions to generalize this more broadly, that's fine too. > Support per-connector FileSystem configuration > -- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.12.0 >Reporter: Padarn Wilson >Assignee: Josh Mahonin >Priority: Major > Labels: pull-request-available > Attachments: FLINK-19589.patch > > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration
[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17571034#comment-17571034 ] Josh Mahonin commented on FLINK-19589: -- Sure thing [~martijnvisser] > Support per-connector FileSystem configuration > -- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.12.0 >Reporter: Padarn Wilson >Assignee: Josh Mahonin >Priority: Major > Attachments: FLINK-19589.patch > > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-19589) Support per-connector FileSystem configuration
[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570902#comment-17570902 ] Josh Mahonin commented on FLINK-19589: -- As a follow-up to the discussion on the list above, I've implemented a small change within the {{S3FileSystemFactory}} to allow passing parameters dynamically, rather than requiring they be present in the global configuration. It does this by using query parameters on the URI object, and applies any of the {{fs.s3a.}} prefixed options to the Hadoop Configuration. In this way, one can construct a URI such as {{s3a://bucket/path?fs.s3a.assumed.role.arn=some_arn}} and subsequently have the underlying Hadoop-AWS library apply the ARN setting. I'm happy to take any feedback on this approach, and whether it can be generalized to other FileSystems. If there is existing strategy in progress, I'm fine with assisting with that effort also. [^FLINK-19589.patch] > Support per-connector FileSystem configuration > -- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.12.0 >Reporter: Padarn Wilson >Priority: Major > Attachments: FLINK-19589.patch > > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-19589) Support per-connector FileSystem configuration
[ https://issues.apache.org/jira/browse/FLINK-19589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Mahonin updated FLINK-19589: - Attachment: FLINK-19589.patch > Support per-connector FileSystem configuration > -- > > Key: FLINK-19589 > URL: https://issues.apache.org/jira/browse/FLINK-19589 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.12.0 >Reporter: Padarn Wilson >Priority: Major > Attachments: FLINK-19589.patch > > > Currently, options for file systems can only be configured globally. However, > in many cases, users would like to configure more fine-grained. > Either we allow a properties map similar to Kafka or Kinesis properties to > our connectors. > Or something like: > Management of two properties related S3 Object management: > - [Lifecycle configuration > |https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html] > - [Object > tagging|https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.htm] > Being able to control these is useful for people who want to manage jobs > using S3 for checkpointing or job output, but need to control per job level > configuration of the tagging/lifecycle for the purposes of auditing or cost > control (for example deleting old state from S3) > Ideally, it would be possible to control this on each object being written by > Flink, or at least at a job level. > _Note_*:* Some related existing properties can be set using the hadoop module > using system properties: see for example > {code:java} > fs.s3a.acl.default{code} > which sets the default ACL on written objects. > *Solutions*: > 1) Modify hadoop module: > The above-linked module could be updated in order to have a new property (and > similar for lifecycle) > fs.s3a.tags.default > which could be a comma separated list of tags to set. For example > {code:java} > fs.s3a.acl.default = "jobname:JOBNAME,owner:OWNER"{code} > This seems like a natural place to put this logic (and is outside of Flink if > we decide to go this way. However it does not allow for a sink and checkpoint > to have different values for these. > 2) Expose withTagging from module > The hadoop module used by Flink's existing filesystem has already exposed put > request level tagging (see > [this|https://github.com/aws/aws-sdk-java/blob/c06822732612d7208927d2a678073098522085c3/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/PutObjectRequest.java#L292]). > This could be used in the Flink filesystem plugin to expose these options. A > possible approach could be to somehow incorporate it into the file path, e.g., > {code:java} > path = "TAGS:s3://bucket/path"{code} > Or possible as an option that can be applied to the checkpoint and sink > configurations, e.g., > {code:java} > env.getCheckpointingConfig().setS3Tags(TAGS) {code} > and similar for a file sink. > _Note_: The lifecycle can also be managed using the module: see > [here|https://docs.aws.amazon.com/AmazonS3/latest/dev/manage-lifecycle-using-java.html]. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)