[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules
[ https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244953#comment-17244953 ] Victor Wong commented on FLINK-12130: - [~lingyaKK], thanks for your time! [~aljoscha], could you help review and merge this, https://github.com/apache/flink/pull/14271 > Apply command line options to configuration before installing security modules > -- > > Key: FLINK-12130 > URL: https://issues.apache.org/jira/browse/FLINK-12130 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Major > Labels: pull-request-available > > Currently if the user configures Kerberos credentials through command line, > it won't work. > {code:java} > // flink run -m yarn-cluster -yD > security.kerberos.login.keytab=/path/to/keytab -yD > security.kerberos.login.principal=xxx /path/to/test.jar > {code} > Above command would cause security failure if you do not have a ticket cache > w/ kinit. > Maybe we could call > _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_ > before _SecurityUtils.install(new > SecurityConfiguration(cli.configuration));_ > Here is a demo patch: > [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules
[ https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241404#comment-17241404 ] Victor Wong commented on FLINK-12130: - [~lingyaKK],[~zhouqi], hi, I come up with a new PR, https://github.com/apache/flink/pull/14271, could you help me review this. > Apply command line options to configuration before installing security modules > -- > > Key: FLINK-12130 > URL: https://issues.apache.org/jira/browse/FLINK-12130 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Major > Labels: pull-request-available > > Currently if the user configures Kerberos credentials through command line, > it won't work. > {code:java} > // flink run -m yarn-cluster -yD > security.kerberos.login.keytab=/path/to/keytab -yD > security.kerberos.login.principal=xxx /path/to/test.jar > {code} > Above command would cause security failure if you do not have a ticket cache > w/ kinit. > Maybe we could call > _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_ > before _SecurityUtils.install(new > SecurityConfiguration(cli.configuration));_ > Here is a demo patch: > [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules
[ https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231204#comment-17231204 ] Victor Wong commented on FLINK-12130: - [~lingyaKK][~zhouqi] Very sorry, I haven't worked on this recently:( I plan to work on this next week, if anyone wants to contribute a new PR here, it would be great! > Apply command line options to configuration before installing security modules > -- > > Key: FLINK-12130 > URL: https://issues.apache.org/jira/browse/FLINK-12130 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Major > > Currently if the user configures Kerberos credentials through command line, > it won't work. > {code:java} > // flink run -m yarn-cluster -yD > security.kerberos.login.keytab=/path/to/keytab -yD > security.kerberos.login.principal=xxx /path/to/test.jar > {code} > Above command would cause security failure if you do not have a ticket cache > w/ kinit. > Maybe we could call > _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_ > before _SecurityUtils.install(new > SecurityConfiguration(cli.configuration));_ > Here is a demo patch: > [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules
[ https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183280#comment-17183280 ] Victor Wong commented on FLINK-12130: - [~aljoscha] I'd like to help, what about a new PR based on master branch? > Apply command line options to configuration before installing security modules > -- > > Key: FLINK-12130 > URL: https://issues.apache.org/jira/browse/FLINK-12130 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Major > > Currently if the user configures Kerberos credentials through command line, > it won't work. > {code:java} > // flink run -m yarn-cluster -yD > security.kerberos.login.keytab=/path/to/keytab -yD > security.kerberos.login.principal=xxx /path/to/test.jar > {code} > Above command would cause security failure if you do not have a ticket cache > w/ kinit. > Maybe we could call > _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_ > before _SecurityUtils.install(new > SecurityConfiguration(cli.configuration));_ > Here is a demo patch: > [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16884) The "rest.port" configuration is always "0" with flink-yarn jobs
Victor Wong created FLINK-16884: --- Summary: The "rest.port" configuration is always "0" with flink-yarn jobs Key: FLINK-16884 URL: https://issues.apache.org/jira/browse/FLINK-16884 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.10.0 Reporter: Victor Wong We would like to get the "rest.port" value with flink rest api ["/jobmanager/config" | https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html], but it is always "0" in the returned json content, which is not the real port. Since the "rest.port" is dynamically assigned, we could update the value after the `WebMonitorEndpoint` is started. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17063064#comment-17063064 ] Victor Wong commented on FLINK-15447: - Currently, we can solve this issue through "env.java.opts: -Djava.io.tmpdir=./tmp", closing this issue now. > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong closed FLINK-15447. --- Resolution: Not A Problem > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053156#comment-17053156 ] Victor Wong commented on FLINK-15447: - Hi, [~trohrmann], since this issue is still valid against the current master branch, I came up with a PR to demonstrate my intended change, which was implemented mainly based on previous discussions. Please give me some advice if available. > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16376) Using consistent method to get Yarn application directory
[ https://issues.apache.org/jira/browse/FLINK-16376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17052688#comment-17052688 ] Victor Wong commented on FLINK-16376: - Hi, [~trohrmann], the PR is attached. > Using consistent method to get Yarn application directory > - > > Key: FLINK-16376 > URL: https://issues.apache.org/jira/browse/FLINK-16376 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > The Yarn application directory of Flink is "/user/{user.name}/.flink", but > this logic is separated in different places. > 1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir > {code:java} > private Path getYarnFilesDir(final ApplicationId appId) throws > IOException { > final FileSystem fileSystem = FileSystem.get(yarnConfiguration); > final Path homeDir = fileSystem.getHomeDirectory(); > return new Path(homeDir, ".flink/" + appId + '/'); > } > {code} > 2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote > {code:java} > // copy resource to HDFS > String suffix = > ".flink/" > + appId > + (relativeTargetPath.isEmpty() ? "" : "/" + > relativeTargetPath) > + "/" + localSrcPath.getName(); > Path dst = new Path(homedir, suffix); > {code} > We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and > use this method to get Yarn application directory in all the other places. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16376) Using consistent method to get Yarn application directory
[ https://issues.apache.org/jira/browse/FLINK-16376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-16376: Description: The Yarn application directory of Flink is "/user/{user.name}/.flink", but this logic is separated in different places. 1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir {code:java} private Path getYarnFilesDir(final ApplicationId appId) throws IOException { final FileSystem fileSystem = FileSystem.get(yarnConfiguration); final Path homeDir = fileSystem.getHomeDirectory(); return new Path(homeDir, ".flink/" + appId + '/'); } {code} 2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote {code:java} // copy resource to HDFS String suffix = ".flink/" + appId + (relativeTargetPath.isEmpty() ? "" : "/" + relativeTargetPath) + "/" + localSrcPath.getName(); Path dst = new Path(homedir, suffix); {code} We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and use this method to get Yarn application directory in all the other places. was: The Yarn application directory of Flink is "/user/{user.name}/.flink", but this logic is separated in different places. 1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir {code:java} private Path getYarnFilesDir(final ApplicationId appId) throws IOException { final FileSystem fileSystem = FileSystem.get(yarnConfiguration); final Path homeDir = fileSystem.getHomeDirectory(); return *new Path(homeDir, ".flink/" + appId + '/');* } {code} 2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote {code:java} // copy resource to HDFS String suffix = *".flink/"* *+ appId* + (relativeTargetPath.isEmpty() ? "" : "/" + relativeTargetPath) + "/" + localSrcPath.getName(); Path dst = new Path(homedir, suffix); {code} We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and use this method to get Yarn application directory in all the other places. > Using consistent method to get Yarn application directory > - > > Key: FLINK-16376 > URL: https://issues.apache.org/jira/browse/FLINK-16376 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.10.0 >Reporter: Victor Wong >Priority: Major > > The Yarn application directory of Flink is "/user/{user.name}/.flink", but > this logic is separated in different places. > 1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir > {code:java} > private Path getYarnFilesDir(final ApplicationId appId) throws > IOException { > final FileSystem fileSystem = FileSystem.get(yarnConfiguration); > final Path homeDir = fileSystem.getHomeDirectory(); > return new Path(homeDir, ".flink/" + appId + '/'); > } > {code} > 2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote > {code:java} > // copy resource to HDFS > String suffix = > ".flink/" > + appId > + (relativeTargetPath.isEmpty() ? "" : "/" + > relativeTargetPath) > + "/" + localSrcPath.getName(); > Path dst = new Path(homedir, suffix); > {code} > We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and > use this method to get Yarn application directory in all the other places. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16376) Using consistent method to get Yarn application directory
Victor Wong created FLINK-16376: --- Summary: Using consistent method to get Yarn application directory Key: FLINK-16376 URL: https://issues.apache.org/jira/browse/FLINK-16376 Project: Flink Issue Type: Improvement Components: Deployment / YARN Affects Versions: 1.10.0 Reporter: Victor Wong The Yarn application directory of Flink is "/user/{user.name}/.flink", but this logic is separated in different places. 1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir {code:java} private Path getYarnFilesDir(final ApplicationId appId) throws IOException { final FileSystem fileSystem = FileSystem.get(yarnConfiguration); final Path homeDir = fileSystem.getHomeDirectory(); return *new Path(homeDir, ".flink/" + appId + '/');* } {code} 2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote {code:java} // copy resource to HDFS String suffix = *".flink/"* *+ appId* + (relativeTargetPath.isEmpty() ? "" : "/" + relativeTargetPath) + "/" + localSrcPath.getName(); Path dst = new Path(homedir, suffix); {code} We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and use this method to get Yarn application directory in all the other places. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15992) Incorrect classloader when finding TableFactory
[ https://issues.apache.org/jira/browse/FLINK-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041731#comment-17041731 ] Victor Wong commented on FLINK-15992: - [~jark] ok, sorry for the inconvenience caused by this PR:( > Incorrect classloader when finding TableFactory > --- > > Key: FLINK-15992 > URL: https://issues.apache.org/jira/browse/FLINK-15992 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / API, Tests >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > Time Spent: 20m > Remaining Estimate: 0h > > *Background* > As a streaming service maintainer in our company, to ensure our users depend > on the correct version of Kafka and flink-kafka, we add > "flink-connector-kafka" into "fink-dist/lib" directory. > *Problem* > When submitting flink-sql jobs, we encountered below exceptions: > {code:java} > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > the classpath. > {code} > But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in > "META-INF/services/org.apache.flink.table.factories.TableFactory", which > implements DeserializationSchemaFactory. > *Debug* > We find that it was caused by this: > {code:java} > // > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema > final SerializationSchemaFactory formatFactory = > TableFactoryService.find( > SerializationSchemaFactory.class, > properties, > this.getClass().getClassLoader()); > {code} > It uses `this.getClass().getClassLoader()`, which will be > BootStrapClassLoader of flink. > I think we could replace it with > `Thread.currentThread().getContextClassLoader()` to solve this. > There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15992) Incorrect classloader when finding TableFactory
[ https://issues.apache.org/jira/browse/FLINK-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17034961#comment-17034961 ] Victor Wong commented on FLINK-15992: - [~jark], thanks Jark, I will open a PR for this and ping you later. > Incorrect classloader when finding TableFactory > --- > > Key: FLINK-15992 > URL: https://issues.apache.org/jira/browse/FLINK-15992 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Victor Wong >Priority: Major > > *Background* > As a streaming service maintainer in our company, to ensure our users depend > on the correct version of Kafka and flink-kafka, we add > "flink-connector-kafka" into "fink-dist/lib" directory. > *Problem* > When submitting flink-sql jobs, we encountered below exceptions: > {code:java} > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > the classpath. > {code} > But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in > "META-INF/services/org.apache.flink.table.factories.TableFactory", which > implements DeserializationSchemaFactory. > *Debug* > We find that it was caused by this: > {code:java} > // > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema > final SerializationSchemaFactory formatFactory = > TableFactoryService.find( > SerializationSchemaFactory.class, > properties, > this.getClass().getClassLoader()); > {code} > It uses `this.getClass().getClassLoader()`, which will be > BootStrapClassLoader of flink. > I think we could replace it with > `Thread.currentThread().getContextClassLoader()` to solve this. > There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15992) Incorrect classloader when finding TableFactory
[ https://issues.apache.org/jira/browse/FLINK-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17034349#comment-17034349 ] Victor Wong commented on FLINK-15992: - That's right, we can just remove the this.getClass().getClassLoader() parameter.(y) As the second question, the required TableFactory was configured in the user jar, so the BootStrapClassLoader could not find the class. > Incorrect classloader when finding TableFactory > --- > > Key: FLINK-15992 > URL: https://issues.apache.org/jira/browse/FLINK-15992 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Victor Wong >Priority: Major > > *Background* > As a streaming service maintainer in our company, to ensure our users depend > on the correct version of Kafka and flink-kafka, we add > "flink-connector-kafka" into "fink-dist/lib" directory. > *Problem* > When submitting flink-sql jobs, we encountered below exceptions: > {code:java} > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > the classpath. > {code} > But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in > "META-INF/services/org.apache.flink.table.factories.TableFactory", which > implements DeserializationSchemaFactory. > *Debug* > We find that it was caused by this: > {code:java} > // > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema > final SerializationSchemaFactory formatFactory = > TableFactoryService.find( > SerializationSchemaFactory.class, > properties, > this.getClass().getClassLoader()); > {code} > It uses `this.getClass().getClassLoader()`, which will be > BootStrapClassLoader of flink. > I think we could replace it with > `Thread.currentThread().getContextClassLoader()` to solve this. > There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15992) Incorrect classloader when finding TableFactory
[ https://issues.apache.org/jira/browse/FLINK-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15992: Description: *Background* As a streaming service maintainer in our company, to ensure our users depend on the correct version of Kafka and flink-kafka, we add "flink-connector-kafka" into "fink-dist/lib" directory. *Problem* When submitting flink-sql jobs, we encountered below exceptions: {code:java} Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. {code} But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in "META-INF/services/org.apache.flink.table.factories.TableFactory", which implements DeserializationSchemaFactory. *Debug* We find that it was caused by this: {code:java} // org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema final SerializationSchemaFactory formatFactory = TableFactoryService.find( SerializationSchemaFactory.class, properties, this.getClass().getClassLoader()); {code} It uses `this.getClass().getClassLoader()`, which will be BootStrapClassLoader of flink. I think we could replace it with `Thread.currentThread().getContextClassLoader()` to solve this. There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552 was: *Background* As a streaming service maintainer in our company, to ensure our users depend on the correct version of Kafka and flink-kafka, we add "flink-connector-kafka" into "fink-dist/lib" directory. *Problem* When submitting flink-sql jobs, we encountered below exceptions: {code:java} Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. {code} *Debug* We find that it was caused by this: {code:java} // org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema final SerializationSchemaFactory formatFactory = TableFactoryService.find( SerializationSchemaFactory.class, properties, this.getClass().getClassLoader()); {code} It uses `this.getClass().getClassLoader()`, which will be BootStrapClassLoader of fink. We could replace it with `Thread.currentThread().getContextClassLoader()` to solve this. There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552 > Incorrect classloader when finding TableFactory > --- > > Key: FLINK-15992 > URL: https://issues.apache.org/jira/browse/FLINK-15992 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Victor Wong >Priority: Major > > *Background* > As a streaming service maintainer in our company, to ensure our users depend > on the correct version of Kafka and flink-kafka, we add > "flink-connector-kafka" into "fink-dist/lib" directory. > *Problem* > When submitting flink-sql jobs, we encountered below exceptions: > {code:java} > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > the classpath. > {code} > But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in > "META-INF/services/org.apache.flink.table.factories.TableFactory", which > implements DeserializationSchemaFactory. > *Debug* > We find that it was caused by this: > {code:java} > // > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema > final SerializationSchemaFactory formatFactory = > TableFactoryService.find( > SerializationSchemaFactory.class, > properties, > this.getClass().getClassLoader()); > {code} > It uses `this.getClass().getClassLoader()`, which will be > BootStrapClassLoader of flink. > I think we could replace it with > `Thread.currentThread().getContextClassLoader()` to solve this. > There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15992) Incorrect classloader when finding TableFactory
Victor Wong created FLINK-15992: --- Summary: Incorrect classloader when finding TableFactory Key: FLINK-15992 URL: https://issues.apache.org/jira/browse/FLINK-15992 Project: Flink Issue Type: Bug Components: Connectors / Kafka Reporter: Victor Wong *Background* As a streaming service maintainer in our company, to ensure our users depend on the correct version of Kafka and flink-kafka, we add "flink-connector-kafka" into "fink-dist/lib" directory. *Problem* When submitting flink-sql jobs, we encountered below exceptions: {code:java} Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. {code} *Debug* We find that it was caused by this: {code:java} // org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema final SerializationSchemaFactory formatFactory = TableFactoryService.find( SerializationSchemaFactory.class, properties, this.getClass().getClassLoader()); {code} It uses `this.getClass().getClassLoader()`, which will be BootStrapClassLoader of fink. We could replace it with `Thread.currentThread().getContextClassLoader()` to solve this. There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028644#comment-17028644 ] Victor Wong commented on FLINK-15447: - [~trohrmann], thanks for your attention. _I am wondering whether you would like to configure the system property {{java.io.tmpdir}} to point towards {{./tmp}} or to only change Flink's temp directories._ _---_ The former, configure the system property java.io.tmpdir. _If not, then we would need to adapt the java command which starts the Flink processes._ _---_ I think this is the best choice, which has _the benefit that libraries, relying on {{java.io.tmpdir}}, will not write their temporary data to {{/tmp}}, too._ > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15449) Retain lost task managers on Flink UI
[ https://issues.apache.org/jira/browse/FLINK-15449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028639#comment-17028639 ] Victor Wong commented on FLINK-15449: - [~chesnay], as suggested by [~fly_in_gis], we could _use the schema "http://\{RM_Address:PORT}/node/containerlogs/\{container_id}/\{user}"; to construct the log url,_ so I think it's feasible to retain a log URL to lost TMs. I agree that it would add additional complexity, but it's inconvenient to debug lost TMs right now, I think it's worth a shot to solve this. > Retain lost task managers on Flink UI > - > > Key: FLINK-15449 > URL: https://issues.apache.org/jira/browse/FLINK-15449 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > With Flink on Yarn, sometimes our TaskManager was killed because of OOM or > heartbeat timeout or whatever reasons, it's not convenient to check out the > logs of the lost TaskManger. > Can we retain the lost task managers on Flink UI, and provide the log service > through Yarn (we can redirect the URL of log/stdout to Yarn container > log/stdout)? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021862#comment-17021862 ] Victor Wong commented on FLINK-15447: - [~fly_in_gis], it makes sense to make "java.io.tmpdir" configurable, we could add a new YarnOption configuration to achieve this. If this issue would be assigned to me, you could help me to review my PR. > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
[ https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021859#comment-17021859 ] Victor Wong commented on FLINK-14642: - [~liuyufei] if I understand correctly, the Tuple and CaseClass themself should not be NULL, but their fields could be NULL. > Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values > > > Key: FLINK-14642 > URL: https://issues.apache.org/jira/browse/FLINK-14642 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Currently, TupleSerializer and CaseCassSerializer do not support serialize > NULL values, which I think is acceptable. But not supporting copy NULL values > will cause the following codes to throw an exception, which I think is not > matched with users' expectations and prone to error. > *codes:* > {code:java} > stream.map(xxx).filter(_ != null).xxx //the return type of the map function > is Tuple and it may return null{code} > > *exception info:* > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) > > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) > {code} > > *suggestion:* > Can we make the `copy` method of TupleSerializer/CaseClassSerializer to > handle NULL values? e.g. > {code:java} > // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy > def copy(from: T): T = { > // handle NULL values. > if(from == null) { > return from > } > initArray() > var i = 0 > while (i < arity) { > fields(i) = > fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) > i += 1 > } > createInstance(fields) > } > {code} > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021858#comment-17021858 ] Victor Wong commented on FLINK-15447: - [~rongr] updated this issue based on your suggestion. Could you assign it to me? Thanks! > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15447: Description: *#Background* Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to the default value, which is "/tmp". Sometimes we ran into exceptions caused by a full "/tmp" directory, which would not be cleaned automatically after applications finished. #*Goal* quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] _1) Tasks can utilize all disks when using tmp_ _2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) is done._ #*Suggestion* I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or something similar. "PWD" will be replaced with the true working directory of JM/TM by Yarn, which will be cleaned automatically. was: # *Background* Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to the default value, which is "/tmp". Sometimes we ran into exceptions caused by a full "/tmp" directory, which would not be cleaned automatically after applications finished. *# Goal* 1) Tasks can utilize all disks when using tmp 2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) is done. I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or something similar. "PWD" will be replaced with the true working directory of JM/TM by Yarn, which will be cleaned automatically. > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > *#Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > #*Goal* > quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735] > _1) Tasks can utilize all disks when using tmp_ > _2) Any undeleted tmp files will be deleted by the tasktracker when > task(job?) is done._ > > #*Suggestion* > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15447: Description: # *Background* Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to the default value, which is "/tmp". Sometimes we ran into exceptions caused by a full "/tmp" directory, which would not be cleaned automatically after applications finished. *# Goal* 1) Tasks can utilize all disks when using tmp 2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) is done. I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or something similar. "PWD" will be replaced with the true working directory of JM/TM by Yarn, which will be cleaned automatically. was: Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to the default value, which is "/tmp". Sometimes we ran into exceptions caused by a full "/tmp" directory, which would not be cleaned automatically after applications finished. I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or something similar. "PWD" will be replaced with the true working directory of JM/TM by Yarn, which will be cleaned automatically. > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > # *Background* > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > > *# Goal* > 1) Tasks can utilize all disks when using tmp > 2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) > is done. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15447: Summary: To improve utilization of the `java.io.tmpdir` for YARN module (was: Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" ) > To improve utilization of the `java.io.tmpdir` for YARN module > -- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018792#comment-17018792 ] Victor Wong commented on FLINK-15447: - [~fly_in_gis] the owner of the working directory of Yarn container should be the same as the owner of the JM/TM process, so I think there should be no permission issue, right? > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018790#comment-17018790 ] Victor Wong edited comment on FLINK-15447 at 1/19/20 4:24 AM: -- [~rongr] for me #2 is most close to the main concern, but we do not want to share with others for fine-grain control on disk resource, but for a shared location is very prone to be disk full. There are some good points in HADOOP-2735: _Can we add -Djava.io.tmpdir="./tmp" somewhere ?_ _so that,_ _1) Tasks can utilize all disks when using tmp_ _2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) is done._ was (Author: victor-wong): [~rongr] for me #2 is most close to the main concern, but we do not want to share with others not for fine-grain control on disk resource, but for a shared location is very prone to be disk full. There are some good points in HADOOP-2735: _Can we add -Djava.io.tmpdir="./tmp" somewhere ?_ _so that,_ _1) Tasks can utilize all disks when using tmp_ _2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) is done._ > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018790#comment-17018790 ] Victor Wong edited comment on FLINK-15447 at 1/19/20 4:23 AM: -- [~rongr] for me #2 is most close to the main concern, but we do not want to share with others not for fine-grain control on disk resource, but for a shared location is very prone to be disk full. There are some good points in HADOOP-2735: _Can we add -Djava.io.tmpdir="./tmp" somewhere ?_ _so that,_ _1) Tasks can utilize all disks when using tmp_ _2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) is done._ was (Author: victor-wong): [~rongr] for me #2 is most close to the main concern, but we do not want to share with others not for fine-grain control on disk resource, but for a shared location is very prone to be disk full. There are some good points in [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]: _Can we add -Djava.io.tmpdir="./tmp" somewhere ? so that, 1) Tasks can utilize all disks when using tmp 2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) is done._ > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018790#comment-17018790 ] Victor Wong commented on FLINK-15447: - [~rongr] for me #2 is most close to the main concern, but we do not want to share with others not for fine-grain control on disk resource, but for a shared location is very prone to be disk full. There are some good points in [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]: _Can we add -Djava.io.tmpdir="./tmp" somewhere ? so that, 1) Tasks can utilize all disks when using tmp 2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) is done._ > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018462#comment-17018462 ] Victor Wong edited comment on FLINK-15447 at 1/18/20 1:41 AM: -- [~rongr] Thanks for your reply! _Could you elaborate what does it mean by Flink-YARN default the value to /tmp? I am guessing you mean JVM default the value to /tmp ?_ --- Yes, I mean that Flink-YARN is using the default value of JVM, which is "/tmp". _So far the only place I can see in flink yarn code utilizing this key is .._ --- The third-party dependencies might utilize this key as well, as [~lzljs3620320] mentioned. My intention is similar to how [~xymaqingxiang] mentioned that setting the value to a tmp directory under the working directory of Yarn container. was (Author: victor-wong): [~rongr] Thanks for your reply! _Could you elaborate what does it mean by Flink-YARN default the value to /tmp? I am guessing you mean JVM default the value to /tmp ?_ --- Yes, I mean that Flink-YARN is using the default value of JVM, which is "/tmp". _So far the only place I can see in flink yarn code utilizing this key is: https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899_ --- The third-party dependencies might utilize this key as well, as [~lzljs3620320] mentioned. My intention is similar to how [~xymaqingxiang] mentioned that setting the value to a tmp directory under the working directory of Yarn container. > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018462#comment-17018462 ] Victor Wong edited comment on FLINK-15447 at 1/18/20 1:40 AM: -- [~rongr] Thanks for your reply! _Could you elaborate what does it mean by Flink-YARN default the value to /tmp? I am guessing you mean JVM default the value to /tmp ?_ --- Yes, I mean that Flink-YARN is using the default value of JVM, which is "/tmp". _So far the only place I can see in flink yarn code utilizing this key is: https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899_ --- The third-party dependencies might utilize this key as well, as [~lzljs3620320] mentioned. My intention is similar to how [~xymaqingxiang] mentioned that setting the value to a tmp directory under the working directory of Yarn container. was (Author: victor-wong): [~rongr] Thanks for your reply! _Could you elaborate what does it mean by Flink-YARN default the value to /tmp? I am guessing you mean JVM default the value to /tmp ? _ --- Yes, I mean that Flink-YARN is using the default value of JVM, which is "/tmp". _So far the only place I can see in flink yarn code utilizing this key is: https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899 _ --- The third-party dependencies might utilize this key as well, as [~lzljs3620320] mentioned. My intention is similar to how [~xymaqingxiang] mentioned that setting the value to a tmp directory under the working directory of Yarn container. > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018462#comment-17018462 ] Victor Wong edited comment on FLINK-15447 at 1/18/20 1:40 AM: -- [~rongr] Thanks for your reply! _Could you elaborate what does it mean by Flink-YARN default the value to /tmp? I am guessing you mean JVM default the value to /tmp ?_ --- Yes, I mean that Flink-YARN is using the default value of JVM, which is "/tmp". _So far the only place I can see in flink yarn code utilizing this key is: https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899_ --- The third-party dependencies might utilize this key as well, as [~lzljs3620320] mentioned. My intention is similar to how [~xymaqingxiang] mentioned that setting the value to a tmp directory under the working directory of Yarn container. was (Author: victor-wong): [~rongr] Thanks for your reply! _Could you elaborate what does it mean by Flink-YARN default the value to /tmp? I am guessing you mean JVM default the value to /tmp ?_ --- Yes, I mean that Flink-YARN is using the default value of JVM, which is "/tmp". _So far the only place I can see in flink yarn code utilizing this key is: https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899_ --- The third-party dependencies might utilize this key as well, as [~lzljs3620320] mentioned. My intention is similar to how [~xymaqingxiang] mentioned that setting the value to a tmp directory under the working directory of Yarn container. > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018462#comment-17018462 ] Victor Wong commented on FLINK-15447: - [~rongr] Thanks for your reply! _Could you elaborate what does it mean by Flink-YARN default the value to /tmp? I am guessing you mean JVM default the value to /tmp ? _ --- Yes, I mean that Flink-YARN is using the default value of JVM, which is "/tmp". _So far the only place I can see in flink yarn code utilizing this key is: https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899 _ --- The third-party dependencies might utilize this key as well, as [~lzljs3620320] mentioned. My intention is similar to how [~xymaqingxiang] mentioned that setting the value to a tmp directory under the working directory of Yarn container. > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011475#comment-17011475 ] Victor Wong commented on FLINK-15448: - [~trohrmann] I haven't done this before, but if you guys are ok with this then I'd like to give it a try with a google doc edition first. > Log host informations for TaskManager failures. > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.1 >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010590#comment-17010590 ] Victor Wong commented on FLINK-15448: - [~zhuzh] different ResourceID could have constructors with deployment-related arguments, e.g. {code:java} public YarnResourceID(Container container) public KubernetesResourceID(KubernetesPod pod) ... {code} So they can have a custom `#toString` method with all the information needed. A simple demo: {code:java} public class YarnResourceID extends ResourceID { private final String resourceIdStr; public YarnResourceID(Container container) { super(container.getId().toString()); this.resourceIdStr = container.getId() + "@" + container.getNodeId(); } @Override public String toString() { return resourceIdStr; } } {code} As [~trohrmann] said, "_Flink uses a lot of ids in its logs which are hard to decipher for the user_", so a custom "ResourceID#toString" method with the information needed would improve the situation. WDYT? > Log host informations for TaskManager failures. > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.1 >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010485#comment-17010485 ] Victor Wong commented on FLINK-15448: - I agreed that using extended ResourceID would be a great help, but not by TaskManagerID/JobManagerID. Maybe we should distinguish different resources by their underlying deployment service, e.g. YarnResourceID/KubernetesResourceID/MesosResourceID. ``` class YarnResourceID extends ResourceID { public YarnResourceID(Container container) { ... } } ``` > Log host informations for TaskManager failures. > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.1 >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15448) Log host informations for TaskManager failures.
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15448: Labels: (was: pull-request-available) > Log host informations for TaskManager failures. > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.1 >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15448) Log host informations for TaskManager failures.
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15448: Labels: (was: pull-request-available) > Log host informations for TaskManager failures. > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.1 >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007267#comment-17007267 ] Victor Wong commented on FLINK-15447: - [~yunta], [~lzljs3620320], sorry to ping you guys, I'd like to know what should I do next? do I need to wait for more discussion to reach a consensus? > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15450) Add kafka topic information to Kafka source name on Flink UI
[ https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007265#comment-17007265 ] Victor Wong commented on FLINK-15450: - Any suggestion on this:) > Add kafka topic information to Kafka source name on Flink UI > > > Key: FLINK-15450 > URL: https://issues.apache.org/jira/browse/FLINK-15450 > Project: Flink > Issue Type: Improvement > Components: API / Core, Connectors / Kafka >Reporter: Victor Wong >Priority: Major > > If the user did not specify a custom name to the source, e.g. Kafka source, > Flink would use the default name "Custom Source", which was not intuitive > (Sink was the same). > {code:java} > Source: Custom Source -> Filter -> Map -> Sink: Unnamed > {code} > If we could add the Kafka topic information to the default Source/Sink name, > it would be very helpful to catch the consuming/publishing topic quickly, > like this: > {code:java} > Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 > {code} > *Suggestion* (forgive me if it makes too many changes) > 1. Add a `name` method to interface `Function` > {code:java} > public interface Function extends java.io.Serializable { > default String name() { return ""; } > } > {code} > 2. Source/Sink/Other functions override this method depending on their needs. > {code:java} > class FlinkKafkaConsumerBase { > String name() { > return this.topicsDescriptor.toString(); > } > } > {code} > 3. Use Function#name if the returned value is not empty. > {code:java} > // StreamExecutionEnvironment > public DataStreamSource addSource(SourceFunction > function) { > String sourceName = function.name(); > if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { > sourceName = "Custom Source"; > } > return addSource(function, sourceName); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007263#comment-17007263 ] Victor Wong commented on FLINK-15448: - [~zhuzh], sorry for my late response, my personal initial intention is to improve logs of TM TimeoutException, since lost container could not be found on Flink UI, for example: {code:java} // 1. org.apache.flink.runtime.jobmaster.JobMaster.TaskManagerHeartbeatListener#notifyHeartbeatTimeout new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out.")); // 2. org.apache.flink.runtime.resourcemanager.ResourceManager.TaskManagerHeartbeatListener#notifyHeartbeatTimeout log.info("The heartbeat of TaskManager with id {} timed out.", resourceID); new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out.")); {code} If it's not proper to "_to add the host to each log line that contains a ResourceID_", improving these exception logs would also be a great help. > Log host informations for TaskManager failures. > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Minor > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006726#comment-17006726 ] Victor Wong commented on FLINK-15448: - [~zhuzh], with a long-running job it's actually quite not convenient to find the host information because the log file can be very large, and if you configured log file rotation, the host information of a container might be missing. In some circumstances, as mentioned by Xintong, "when you have lots of TM failures" it would be more annoying. About the redundancy concern, since the host information is mainly logged in case of exception, I don't think it's a problem:) > Log host informations for TaskManager failures. > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Minor > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15448) Make "ResourceID#toString" more descriptive
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006663#comment-17006663 ] Victor Wong commented on FLINK-15448: - [~xintongsong], thanks for your patient explanation, _"providing more information at the places where the logs are generated"_ seems to be the best choice. Do you mind me to come up with a patch on this? > Make "ResourceID#toString" more descriptive > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15450) Add kafka topic information to Kafka source name on Flink UI
[ https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15450: Component/s: API / Core > Add kafka topic information to Kafka source name on Flink UI > > > Key: FLINK-15450 > URL: https://issues.apache.org/jira/browse/FLINK-15450 > Project: Flink > Issue Type: Improvement > Components: API / Core, Connectors / Kafka >Reporter: Victor Wong >Priority: Major > > If the user did not specify a custom name to the source, e.g. Kafka source, > Flink would use the default name "Custom Source", which was not intuitive > (Sink was the same). > {code:java} > Source: Custom Source -> Filter -> Map -> Sink: Unnamed > {code} > If we could add the Kafka topic information to the default Source/Sink name, > it would be very helpful to catch the consuming/publishing topic quickly, > like this: > {code:java} > Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 > {code} > *Suggestion* (forgive me if it makes too many changes) > 1. Add a `name` method to interface `Function` > {code:java} > public interface Function extends java.io.Serializable { > default String name() { return ""; } > } > {code} > 2. Source/Sink/Other functions override this method depending on their needs. > {code:java} > class FlinkKafkaConsumerBase { > String name() { > return this.topicsDescriptor.toString(); > } > } > {code} > 3. Use Function#name if the returned value is not empty. > {code:java} > // StreamExecutionEnvironment > public DataStreamSource addSource(SourceFunction > function) { > String sourceName = function.name(); > if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { > sourceName = "Custom Source"; > } > return addSource(function, sourceName); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15448) Make "ResourceID#toString" more descriptive
[ https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006609#comment-17006609 ] Victor Wong commented on FLINK-15448: - [~xintongsong], thanks for your reply, I think your concern is very reasonable, but I still have some questions. _the right approach should be providing more information at the places where the logs are generated_ --- The information we need, like the host of TM, is not always available or convenient to access in some place. Take `HeartbeatListener` for example: {code:java} // org.apache.flink.runtime.jobmaster.JobMaster.TaskManagerHeartbeatListener#notifyHeartbeatTimeout public void notifyHeartbeatTimeout(ResourceID resourceID) { validateRunsInMainThread(); // *I think it's not easy to construct a correct log information here*. disconnectTaskManager( resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out.")); } {code} Besides, I think it's error-prone to keep in mind providing the exact needed information when logging. What about initialize Yarn ResourceID with both container and host information, i.e. `new ResourceID(container.getId().toString() + "@" + container.getNodeId())`. Any suggestion? > Make "ResourceID#toString" more descriptive > --- > > Key: FLINK-15448 > URL: https://issues.apache.org/jira/browse/FLINK-15448 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > With Flink on Yarn, sometimes we ran into an exception like this: > {code:java} > java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id > container_ timed out. > {code} > We'd like to find out the host of the lost TaskManager to log into it for > more details, we have to check the previous logs for the host information, > which is a little time-consuming. > Maybe we can add more descriptive information to ResourceID of Yarn > containers, e.g. "container_xxx@host_name:port_number". > Here's the demo: > {code:java} > class ResourceID { > final String resourceId; > final String details; > public ResourceID(String resourceId) { > this.resourceId = resourceId; > this.details = resourceId; > } > public ResourceID(String resourceId, String details) { > this.resourceId = resourceId; > this.details = details; > } > public String toString() { > return details; > } > } > // in flink-yarn > private void startTaskExecutorInContainer(Container container) { > final String containerIdStr = container.getId().toString(); > final String containerDetail = container.getId() + "@" + > container.getNodeId(); > final ResourceID resourceId = new ResourceID(containerIdStr, > containerDetail); > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15449) Retain lost task managers on Flink UI
[ https://issues.apache.org/jira/browse/FLINK-15449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006573#comment-17006573 ] Victor Wong commented on FLINK-15449: - Nice suggestion! We could add the constructed log URL (the Yarn container log URL) to Flink UI. > Retain lost task managers on Flink UI > - > > Key: FLINK-15449 > URL: https://issues.apache.org/jira/browse/FLINK-15449 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > With Flink on Yarn, sometimes our TaskManager was killed because of OOM or > heartbeat timeout or whatever reasons, it's not convenient to check out the > logs of the lost TaskManger. > Can we retain the lost task managers on Flink UI, and provide the log service > through Yarn (we can redirect the URL of log/stdout to Yarn container > log/stdout)? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006558#comment-17006558 ] Victor Wong commented on FLINK-15447: - [~yunta] [~klion26] Thanks for your reply :) _What caused the "/tmp" directory full?_ --- It may not be caused by Flink applications since other applications like MapReduce/Spark would run on the same Yarn cluster too. I think it's not safe to use "/tmp" directory since it's shared by other tasks. Tasks, like the Flink TaskManager, could make use of its own working directory assigned by Yarn. _Flink already set io.tmp.dirs as 'LOCAL_DIRS' in YARN_ --- This config can not solve the problem completely, since the third-party libraries used by users may still use "java.io.tmpdir" property. There is similar discussion here: https://issues.apache.org/jira/browse/HADOOP-2735 > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15450) Add kafka topic information to Kafka source name on Flink UI
[ https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15450: Description: If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same). {code:java} Source: Custom Source -> Filter -> Map -> Sink: Unnamed {code} If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this: {code:java} Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 {code} *Suggestion* (forgive me if it makes too many changes) 1. Add a `name` method to interface `Function` {code:java} public interface Function extends java.io.Serializable { default String name() { return ""; } } {code} 2. Source/Sink/Other functions override this method depending on their needs. {code:java} class FlinkKafkaConsumerBase { String name() { return this.topicsDescriptor.toString(); } } {code} 3. Use Function#name if the returned value is not empty. {code:java} // StreamExecutionEnvironment public DataStreamSource addSource(SourceFunction function) { String sourceName = function.name(); if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { sourceName = "Custom Source"; } return addSource(function, sourceName); } {code} was: If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same). {code:java} Source: Custom Source -> Filter -> Map -> Sink: Unnamed {code} If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this: {code:java} Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 {code} *Suggesion* (forgive me if it makes too much changes) 1. Add a `name` method to interface `Function` {code:java} public interface Function extends java.io.Serializable { default String name() { return ""; } } {code} 2. Source/Sink/Other functions override this method depending on their needs. {code:java} class FlinkKafkaConsumerBase { String name() { return this.topicsDescriptor.toString(); } } {code} 3. Use Function#name if the returned value is not empty. {code:java} // StreamExecutionEnvironment public DataStreamSource addSource(SourceFunction function) { String sourceName = function.name(); if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { sourceName = "Custom Source"; } return addSource(function, sourceName); } {code} > Add kafka topic information to Kafka source name on Flink UI > > > Key: FLINK-15450 > URL: https://issues.apache.org/jira/browse/FLINK-15450 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Victor Wong >Priority: Major > > If the user did not specify a custom name to the source, e.g. Kafka source, > Flink would use the default name "Custom Source", which was not intuitive > (Sink was the same). > {code:java} > Source: Custom Source -> Filter -> Map -> Sink: Unnamed > {code} > If we could add the Kafka topic information to the default Source/Sink name, > it would be very helpful to catch the consuming/publishing topic quickly, > like this: > {code:java} > Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 > {code} > *Suggestion* (forgive me if it makes too many changes) > 1. Add a `name` method to interface `Function` > {code:java} > public interface Function extends java.io.Serializable { > default String name() { return ""; } > } > {code} > 2. Source/Sink/Other functions override this method depending on their needs. > {code:java} > class FlinkKafkaConsumerBase { > String name() { > return this.topicsDescriptor.toString(); > } > } > {code} > 3. Use Function#name if the returned value is not empty. > {code:java} > // StreamExecutionEnvironment > public DataStreamSource addSource(SourceFunction > function) { > String sourceName = function.name(); > if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { > sourceName = "Custom Source"; > } > return addSource(function, sourceName); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15450) Add kafka topic information to Kafka source name on Flink UI
[ https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15450: Summary: Add kafka topic information to Kafka source name on Flink UI (was: Add kafka topic information to Kafka source) > Add kafka topic information to Kafka source name on Flink UI > > > Key: FLINK-15450 > URL: https://issues.apache.org/jira/browse/FLINK-15450 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Victor Wong >Priority: Major > > If the user did not specify a custom name to the source, e.g. Kafka source, > Flink would use the default name "Custom Source", which was not intuitive > (Sink was the same). > {code:java} > Source: Custom Source -> Filter -> Map -> Sink: Unnamed > {code} > If we could add the Kafka topic information to the default Source/Sink name, > it would be very helpful to catch the consuming/publishing topic quickly, > like this: > {code:java} > Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 > {code} > *Suggesion* (forgive me if it makes too much changes) > 1. Add a `name` method to interface `Function` > {code:java} > public interface Function extends java.io.Serializable { > default String name() { return ""; } > } > {code} > 2. Source/Sink/Other functions override this method depending on their needs. > {code:java} > class FlinkKafkaConsumerBase { > String name() { > return this.topicsDescriptor.toString(); > } > } > {code} > 3. Use Function#name if the returned value is not empty. > {code:java} > // StreamExecutionEnvironment > public DataStreamSource addSource(SourceFunction > function) { > String sourceName = function.name(); > if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { > sourceName = "Custom Source"; > } > return addSource(function, sourceName); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15450) Add kafka topic information to Kafka source
Victor Wong created FLINK-15450: --- Summary: Add kafka topic information to Kafka source Key: FLINK-15450 URL: https://issues.apache.org/jira/browse/FLINK-15450 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: Victor Wong If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same). {code:java} Source: Custom Source -> Filter -> Map -> Sink: Unnamed {code} If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this: {code:java} Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1 {code} *Suggesion* (forgive me if it makes too much changes) 1. Add a `name` method to interface `Function` {code:java} public interface Function extends java.io.Serializable { default String name() { return ""; } } {code} 2. Source/Sink/Other functions override this method depending on their needs. {code:java} class FlinkKafkaConsumerBase { String name() { return this.topicsDescriptor.toString(); } } {code} 3. Use Function#name if the returned value is not empty. {code:java} // StreamExecutionEnvironment public DataStreamSource addSource(SourceFunction function) { String sourceName = function.name(); if (StringUtils.isNullOrWhitespaceOnly(sourceName)) { sourceName = "Custom Source"; } return addSource(function, sourceName); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15449) Retain lost task managers on Flink UI
Victor Wong created FLINK-15449: --- Summary: Retain lost task managers on Flink UI Key: FLINK-15449 URL: https://issues.apache.org/jira/browse/FLINK-15449 Project: Flink Issue Type: Improvement Components: Deployment / YARN Affects Versions: 1.9.1 Reporter: Victor Wong With Flink on Yarn, sometimes our TaskManager was killed because of OOM or heartbeat timeout or whatever reasons, it's not convenient to check out the logs of the lost TaskManger. Can we retain the lost task managers on Flink UI, and provide the log service through Yarn (we can redirect the URL of log/stdout to Yarn container log/stdout)? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15448) Make "ResourceID#toString" more descriptive
Victor Wong created FLINK-15448: --- Summary: Make "ResourceID#toString" more descriptive Key: FLINK-15448 URL: https://issues.apache.org/jira/browse/FLINK-15448 Project: Flink Issue Type: Improvement Affects Versions: 1.9.1 Reporter: Victor Wong With Flink on Yarn, sometimes we ran into an exception like this: {code:java} java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id container_ timed out. {code} We'd like to find out the host of the lost TaskManager to log into it for more details, we have to check the previous logs for the host information, which is a little time-consuming. Maybe we can add more descriptive information to ResourceID of Yarn containers, e.g. "container_xxx@host_name:port_number". Here's the demo: {code:java} class ResourceID { final String resourceId; final String details; public ResourceID(String resourceId) { this.resourceId = resourceId; this.details = resourceId; } public ResourceID(String resourceId, String details) { this.resourceId = resourceId; this.details = details; } public String toString() { return details; } } // in flink-yarn private void startTaskExecutorInContainer(Container container) { final String containerIdStr = container.getId().toString(); final String containerDetail = container.getId() + "@" + container.getNodeId(); final ResourceID resourceId = new ResourceID(containerIdStr, containerDetail); ... } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
[ https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-15447: Description: Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to the default value, which is "/tmp". Sometimes we ran into exceptions caused by a full "/tmp" directory, which would not be cleaned automatically after applications finished. I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or something similar. "PWD" will be replaced with the true working directory of JM/TM by Yarn, which will be cleaned automatically. was: Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to the default value, which is "/tmp". Sometimes we ran into exceptions caused by a full "/tmp" directory, which would not be cleaned automatically after applications finished. I think we can set "java.io.tmpdir" to "\{{PWD}}/tmp" directory, or something similar. "\{{PWD}}" will be replaced with the true working directory of JM/TM by Yarn, which will be cleaned automatically. > Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" > --- > > Key: FLINK-15447 > URL: https://issues.apache.org/jira/browse/FLINK-15447 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set > to the default value, which is "/tmp". > > Sometimes we ran into exceptions caused by a full "/tmp" directory, which > would not be cleaned automatically after applications finished. > I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or > something similar. "PWD" will be replaced with the true working > directory of JM/TM by Yarn, which will be cleaned automatically. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"
Victor Wong created FLINK-15447: --- Summary: Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp" Key: FLINK-15447 URL: https://issues.apache.org/jira/browse/FLINK-15447 Project: Flink Issue Type: Improvement Components: Deployment / YARN Affects Versions: 1.9.1 Reporter: Victor Wong Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to the default value, which is "/tmp". Sometimes we ran into exceptions caused by a full "/tmp" directory, which would not be cleaned automatically after applications finished. I think we can set "java.io.tmpdir" to "\{{PWD}}/tmp" directory, or something similar. "\{{PWD}}" will be replaced with the true working directory of JM/TM by Yarn, which will be cleaned automatically. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining
[ https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong closed FLINK-14835. --- Resolution: Abandoned > Make `org.apache.flink.configuration.Configuration` support method chaining > --- > > Key: FLINK-14835 > URL: https://issues.apache.org/jira/browse/FLINK-14835 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > *Goal:* > To make the following code examples work in production, which is very handy > for users to set a couple of configurations: > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > > *Suggestion:* > Currently, the return type of `setXXX` method is "void", we can make it > return `Configuration` itself to support method chaining. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining
[ https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984971#comment-16984971 ] Victor Wong commented on FLINK-14835: - That's great:) > Make `org.apache.flink.configuration.Configuration` support method chaining > --- > > Key: FLINK-14835 > URL: https://issues.apache.org/jira/browse/FLINK-14835 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > *Goal:* > To make the following code examples work in production, which is very handy > for users to set a couple of configurations: > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > > *Suggestion:* > Currently, the return type of `setXXX` method is "void", we can make it > return `Configuration` itself to support method chaining. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure
[ https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983089#comment-16983089 ] Victor Wong commented on FLINK-14653: - _"introduce a new exception which users can throw to indicate that the error is application related"_ Surely this can solve the problem, but I think it's a little error-prone for the users since they need to remember to catch some exceptions, e.g. exceptions when flushing events to an external store, and rethrow as the new exception. > Job-related errors in snapshotState do not result in job failure > > > Key: FLINK-14653 > URL: https://issues.apache.org/jira/browse/FLINK-14653 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Maximilian Michels >Priority: Minor > > When users override {{snapshoteState}}, they might include logic there which > is crucial for the correctness of their application, e.g. finalizing a > transaction and buffering the results of that transaction, or flushing events > to an external store. Exceptions occurring should lead to failing the job. > Currently, users must make sure to throw a {{Throwable}} because any > {{Exception}} will be caught by the task and reported as checkpointing error, > when it could be an application error. > It would be helpful to update the documentation and introduce a special > exception that can be thrown for job-related failures, e.g. > {{ApplicationError}} or similar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure
[ https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16982434#comment-16982434 ] Victor Wong commented on FLINK-14653: - [~mxm], any progress on this? I have some solutions, do you mind taking a look: *Solution 1:* catch the exception of `CheckpointedFunction#snapshotState` and rethrow as *Error* like the patch of Beam did. ** *Solution 2:* catch the exception of `CheckpointedFunction#snapshotState` and rethrow as a new exception type, e.g. *SnapshotStateException*, and catch SnapshotStateException later to not mark CheckpointFailureReason as CHECKPOINT_DECLINED, so it would not be ignored even if the user has set his job to tolerate checkpointing failures. > Job-related errors in snapshotState do not result in job failure > > > Key: FLINK-14653 > URL: https://issues.apache.org/jira/browse/FLINK-14653 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Maximilian Michels >Priority: Minor > > When users override {{snapshoteState}}, they might include logic there which > is crucial for the correctness of their application, e.g. finalizing a > transaction and buffering the results of that transaction, or flushing events > to an external store. Exceptions occurring should lead to failing the job. > Currently, users must make sure to throw a {{Throwable}} because any > {{Exception}} will be caught by the task and reported as checkpointing error, > when it could be an application error. > It would be helpful to update the documentation and introduce a special > exception that can be thrown for job-related failures, e.g. > {{ApplicationError}} or similar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining
[ https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16982317#comment-16982317 ] Victor Wong commented on FLINK-14835: - [~jark], after changing the return type of `Configuration`, `mvn clean verify` failed because of japicmp error: {code:java} [ERROR] Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp (default) on project flink-core: Breaking the build because there is at least one incompatibility: org.apache.flink.configuration.Configuration.setBoolean(java.lang.String,boolean):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setBytes(java.lang.String,byte[]):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setClass(java.lang.String,java.lang.Class):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setDouble(java.lang.String,double):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setFloat(java.lang.String,float):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setInteger(java.lang.String,int):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setLong(java.lang.String,long):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setString(java.lang.String,java.lang.String):METHOD_RETURN_TYPE_CHANGED -> [Help 1] {code} It seems we have to give up this feature, right? > Make `org.apache.flink.configuration.Configuration` support method chaining > --- > > Key: FLINK-14835 > URL: https://issues.apache.org/jira/browse/FLINK-14835 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > *Goal:* > To make the following code examples work in production, which is very handy > for users to set a couple of configurations: > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > > *Suggestion:* > Currently, the return type of `setXXX` method is "void", we can make it > return `Configuration` itself to support method chaining. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples
[ https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16982285#comment-16982285 ] Victor Wong commented on FLINK-14817: - [~jark], the PR is ready, could you take a look when convenient. > "Streaming Aggregation" document contains misleading code examples > -- > > Key: FLINK-14817 > URL: https://issues.apache.org/jira/browse/FLINK-14817 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.9.1 >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In the document of [Streaming Aggregation > |https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html] > , there are some misleading code examples, e.g. > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > It seems `Configuration` supports method chaining, while it's not true since > the return type of `Configuration#setString` is Void. > > So what about making `Configuration` support method chaining, or updating the > documentation? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples
[ https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976257#comment-16976257 ] Victor Wong commented on FLINK-14817: - [~jark], thanks for your quick reply. Please assign this issue to me, and this is the issue discussing "method chain for Configuration": [ FLINK-14835 | https://issues.apache.org/jira/browse/FLINK-14835] > "Streaming Aggregation" document contains misleading code examples > -- > > Key: FLINK-14817 > URL: https://issues.apache.org/jira/browse/FLINK-14817 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > In the document of [Streaming Aggregation > |https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html] > , there are some misleading code examples, e.g. > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > It seems `Configuration` supports method chaining, while it's not true since > the return type of `Configuration#setString` is Void. > > So what about making `Configuration` support method chaining, or updating the > documentation? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining
[ https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-14835: Description: *Goal:* To make the following code examples work in production, which is very handy for users to set a couple of configurations: {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} *Suggestion:* Currently, the return type of `setXXX` method is "void", we can make it return `Configuration` itself to support method chaining. was: To make the following code examples work in production, which is very handy for users to set a couple of configurations. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} > Make `org.apache.flink.configuration.Configuration` support method chaining > --- > > Key: FLINK-14835 > URL: https://issues.apache.org/jira/browse/FLINK-14835 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > *Goal:* > To make the following code examples work in production, which is very handy > for users to set a couple of configurations: > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > > *Suggestion:* > Currently, the return type of `setXXX` method is "void", we can make it > return `Configuration` itself to support method chaining. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining
[ https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-14835: Description: To make the following code examples work in production, which is very handy for users to set a couple of configurations. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} was: To make the following code examples work in production, which is very handy for users to set a couple of configurations. // instantiate table environmentTableEnvironment tEnv = ...tEnv.getConfig() // access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task > Make `org.apache.flink.configuration.Configuration` support method chaining > --- > > Key: FLINK-14835 > URL: https://issues.apache.org/jira/browse/FLINK-14835 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > To make the following code examples work in production, which is very handy > for users to set a couple of configurations. > > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining
[ https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-14835: External issue URL: (was: https://issues.apache.org/jira/browse/FLINK-14817) > Make `org.apache.flink.configuration.Configuration` support method chaining > --- > > Key: FLINK-14835 > URL: https://issues.apache.org/jira/browse/FLINK-14835 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > To make the following code examples work in production, which is very handy > for users to set a couple of configurations. > > // instantiate table environmentTableEnvironment tEnv = ...tEnv.getConfig() > // access high-level configuration .getConfiguration() // set > low-level key-value options .setString("table.exec.mini-batch.enabled", > "true") // enable mini-batch optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to > buffer input records .setString("table.exec.mini-batch.size", "5000"); // > the maximum number of records can be buffered by each aggregate operator task -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining
Victor Wong created FLINK-14835: --- Summary: Make `org.apache.flink.configuration.Configuration` support method chaining Key: FLINK-14835 URL: https://issues.apache.org/jira/browse/FLINK-14835 Project: Flink Issue Type: Improvement Components: Runtime / Configuration Affects Versions: 1.9.1 Reporter: Victor Wong To make the following code examples work in production, which is very handy for users to set a couple of configurations. // instantiate table environmentTableEnvironment tEnv = ...tEnv.getConfig() // access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples
[ https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-14817: Description: In the document of [Streaming Aggregation |https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html] , there are some misleading code examples, e.g. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} It seems `Configuration` supports method chaining, while it's not true since the return type of `Configuration#setString` is Void. So what about making `Configuration` support method chaining, or updating the documentation? was: In the document of [Streaming Aggregation | https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html] there are some misleading code examples, e.g. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} It seems `Configuration` supports method chaining, while it's not true since the return type of `Configuration#setString` is Void. So what about making `Configuration` support method chaining, or updating the documentation? > "Streaming Aggregation" document contains misleading code examples > -- > > Key: FLINK-14817 > URL: https://issues.apache.org/jira/browse/FLINK-14817 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > In the document of [Streaming Aggregation > |https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html] > , there are some misleading code examples, e.g. > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > It seems `Configuration` supports method chaining, while it's not true since > the return type of `Configuration#setString` is Void. > > So what about making `Configuration` support method chaining, or updating the > documentation? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples
[ https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-14817: Description: In the document of [Streaming Aggregation | https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html] there are some misleading code examples, e.g. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} It seems `Configuration` supports method chaining, while it's not true since the return type of `Configuration#setString` is Void. So what about making `Configuration` support method chaining, or updating the documentation? was: In the document of [Streaming Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]] there are some misleading code examples, e.g. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} It seems `Configuration` supports method chaining, while it's not true since the return type of `Configuration#setString` is Void. So what about making `Configuration` support method chaining, or updating the documentation? > "Streaming Aggregation" document contains misleading code examples > -- > > Key: FLINK-14817 > URL: https://issues.apache.org/jira/browse/FLINK-14817 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > In the document of [Streaming Aggregation | > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html] > there are some misleading code examples, e.g. > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > It seems `Configuration` supports method chaining, while it's not true since > the return type of `Configuration#setString` is Void. > > So what about making `Configuration` support method chaining, or updating the > documentation? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples
[ https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-14817: Description: In the document of [Streaming Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]] there are some misleading code examples, e.g. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} It seems `Configuration` supports method chaining, while it's not true since the return type of `Configuration#setString` is Void. So what about making `Configuration` support method chaining, or updating the documentation? was: In the document of [Streaming Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html],] there are some misleading code examples, e.g. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} It seems `Configuration` supports method chaining, while it's not true since the return type of `Configuration#setString` is Void. So what about making `Configuration` support method chaining, or updating the documentation? > "Streaming Aggregation" document contains misleading code examples > -- > > Key: FLINK-14817 > URL: https://issues.apache.org/jira/browse/FLINK-14817 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > In the document of [Streaming > Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]] > there are some misleading code examples, e.g. > {code:java} > // instantiate table environment > TableEnvironment tEnv = ...tEnv.getConfig()// access high-level > configuration > .getConfiguration() // set low-level key-value options > .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch > optimization > .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds > to buffer input records > .setString("table.exec.mini-batch.size", "5000"); // the maximum number of > records can be buffered by each aggregate operator task > {code} > It seems `Configuration` supports method chaining, while it's not true since > the return type of `Configuration#setString` is Void. > > So what about making `Configuration` support method chaining, or updating the > documentation? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples
Victor Wong created FLINK-14817: --- Summary: "Streaming Aggregation" document contains misleading code examples Key: FLINK-14817 URL: https://issues.apache.org/jira/browse/FLINK-14817 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.9.1 Reporter: Victor Wong In the document of [Streaming Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html],] there are some misleading code examples, e.g. {code:java} // instantiate table environment TableEnvironment tEnv = ...tEnv.getConfig()// access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records .setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task {code} It seems `Configuration` supports method chaining, while it's not true since the return type of `Configuration#setString` is Void. So what about making `Configuration` support method chaining, or updating the documentation? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure
[ https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16970104#comment-16970104 ] Victor Wong commented on FLINK-14653: - Thanks for your explanation, now I think it makes sense. > Job-related errors in snapshotState do not result in job failure > > > Key: FLINK-14653 > URL: https://issues.apache.org/jira/browse/FLINK-14653 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Maximilian Michels >Priority: Minor > > When users override {{snapshoteState}}, they might include logic there which > is crucial for the correctness of their application, e.g. finalizing a > transaction and buffering the results of that transaction, or flushing events > to an external store. Exceptions occurring should lead to failing the job. > Currently, users must make sure to throw a {{Throwable}} because any > {{Exception}} will be caught by the task and reported as checkpointing error, > when it could be an application error. > It would be helpful to update the documentation and introduce a special > exception that can be thrown for job-related failures, e.g. > {{ApplicationError}} or similar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure
[ https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969976#comment-16969976 ] Victor Wong commented on FLINK-14653: - _"I'm ok with tolerating checkpointing failures, but not ok with sacrificing the correctness of my Flink job."_ Sorry I don't get that, AFAIK in case of checkpointing failures, the whole job will restart and restore from the latest checkpointed state, so what do you mean by "sacrificing the correctness", data loss or data duplicate? > Job-related errors in snapshotState do not result in job failure > > > Key: FLINK-14653 > URL: https://issues.apache.org/jira/browse/FLINK-14653 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Maximilian Michels >Priority: Minor > > When users override {{snapshoteState}}, they might include logic there which > is crucial for the correctness of their application, e.g. finalizing a > transaction and buffering the results of that transaction, or flushing events > to an external store. Exceptions occurring should lead to failing the job. > Currently, users must make sure to throw a {{Throwable}} because any > {{Exception}} will be caught by the task and reported as checkpointing error, > when it could be an application error. > It would be helpful to update the documentation and introduce a special > exception that can be thrown for job-related failures, e.g. > {{ApplicationError}} or similar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
[ https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969921#comment-16969921 ] Victor Wong commented on FLINK-14642: - [~sewen], here is the PR, [https://github.com/apache/flink/pull/10130] I updated the TupleSerializer and CaseClassSerializer, but I'm not sure if we should make the same change to **OptionSerializer** and **RowSerializer**, any suggestion? > Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values > > > Key: FLINK-14642 > URL: https://issues.apache.org/jira/browse/FLINK-14642 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Currently, TupleSerializer and CaseCassSerializer do not support serialize > NULL values, which I think is acceptable. But not supporting copy NULL values > will cause the following codes to throw an exception, which I think is not > matched with users' expectations and prone to error. > *codes:* > {code:java} > stream.map(xxx).filter(_ != null).xxx //the return type of the map function > is Tuple and it may return null{code} > > *exception info:* > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) > > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) > {code} > > *suggestion:* > Can we make the `copy` method of TupleSerializer/CaseClassSerializer to > handle NULL values? e.g. > {code:java} > // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy > def copy(from: T): T = { > // handle NULL values. > if(from == null) { > return from > } > initArray() > var i = 0 > while (i < arity) { > fields(i) = > fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) > i += 1 > } > createInstance(fields) > } > {code} > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure
[ https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969910#comment-16969910 ] Victor Wong commented on FLINK-14653: - [~mxm], if `CheckpointConfig#setTolerableCheckpointFailureNumber` is set to 0 (which is the default value), a checkpoint failure will result in the job failure. > Job-related errors in snapshotState do not result in job failure > > > Key: FLINK-14653 > URL: https://issues.apache.org/jira/browse/FLINK-14653 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Maximilian Michels >Priority: Minor > > When users override {{snapshoteState}}, they might include logic there which > is crucial for the correctness of their application, e.g. finalizing a > transaction and buffering the results of that transaction, or flushing events > to an external store. Exceptions occurring should lead to failing the job. > Currently, users must make sure to throw a {{Throwable}} because any > {{Exception}} will be caught by the task and reported as checkpointing error, > when it could be an application error. > It would be helpful to update the documentation and introduce a special > exception that can be thrown for job-related failures, e.g. > {{ApplicationError}} or similar. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn
[ https://issues.apache.org/jira/browse/FLINK-14626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong closed FLINK-14626. --- Resolution: Invalid > User jar packaged with hadoop dependencies may cause class conflit with > hadoop jars on yarn > --- > > Key: FLINK-14626 > URL: https://issues.apache.org/jira/browse/FLINK-14626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, the yarn application classpath is placed behind Flink classpath > (including user jars), which will cause conflict if the user jar accidentally > included hadoop dependencies. > {code:java} > // org.apache.flink.yarn.Utils#setupYarnClassPath > public static void setupYarnClassPath(Configuration conf, Map > appMasterEnv) { >addToEnvironment( > appMasterEnv, > Environment.CLASSPATH.name(), > appMasterEnv.get(ENV_FLINK_CLASSPATH)); >String[] applicationClassPathEntries = conf.getStrings( > YarnConfiguration.YARN_APPLICATION_CLASSPATH, > YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH); >for (String c : applicationClassPathEntries) { > addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); >} > } > {code} > Maybe we should place the user jars behind yarn application classpath when > `org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is > set to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar". > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn
[ https://issues.apache.org/jira/browse/FLINK-14626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969388#comment-16969388 ] Victor Wong edited comment on FLINK-14626 at 11/7/19 4:20 PM: -- [~chesnay], thanks for your quick response. As an engineer providing company-wide stream processing capabilities for other teams, I have seen quite a few errors related to this issue. It's really prone to including Hadoop dependencies in the uber jar, especially when your job deals with Hadoop systems, e.g. HBase. But I agree that it _would change existing behavior and could result in subtle bugs,_ so I will close this issue and find another way to solve the related problems. was (Author: victor-wong): [~chesnay], thanks for your quick response. As an engineer providing company-wide stream processing capabilities for other teams, I have seen quite a few errors related to this issue. It's really prone to including Hadoop dependencies in the uber jar, especially when your job deals with Hadoop systems, e.g. HBase. But I agree that it _would change existing behavior and could result in subtle bugs,_ so I will close this PR and find another way to solve the related problems. > User jar packaged with hadoop dependencies may cause class conflit with > hadoop jars on yarn > --- > > Key: FLINK-14626 > URL: https://issues.apache.org/jira/browse/FLINK-14626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, the yarn application classpath is placed behind Flink classpath > (including user jars), which will cause conflict if the user jar accidentally > included hadoop dependencies. > {code:java} > // org.apache.flink.yarn.Utils#setupYarnClassPath > public static void setupYarnClassPath(Configuration conf, Map > appMasterEnv) { >addToEnvironment( > appMasterEnv, > Environment.CLASSPATH.name(), > appMasterEnv.get(ENV_FLINK_CLASSPATH)); >String[] applicationClassPathEntries = conf.getStrings( > YarnConfiguration.YARN_APPLICATION_CLASSPATH, > YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH); >for (String c : applicationClassPathEntries) { > addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); >} > } > {code} > Maybe we should place the user jars behind yarn application classpath when > `org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is > set to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar". > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn
[ https://issues.apache.org/jira/browse/FLINK-14626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969388#comment-16969388 ] Victor Wong commented on FLINK-14626: - [~chesnay], thanks for your quick response. As an engineer providing company-wide stream processing capabilities for other teams, I have seen quite a few errors related to this issue. It's really prone to including Hadoop dependencies in the uber jar, especially when your job deals with Hadoop systems, e.g. HBase. But I agree that it _would change existing behavior and could result in subtle bugs,_ so I will close this PR and find another way to solve the related problems. > User jar packaged with hadoop dependencies may cause class conflit with > hadoop jars on yarn > --- > > Key: FLINK-14626 > URL: https://issues.apache.org/jira/browse/FLINK-14626 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, the yarn application classpath is placed behind Flink classpath > (including user jars), which will cause conflict if the user jar accidentally > included hadoop dependencies. > {code:java} > // org.apache.flink.yarn.Utils#setupYarnClassPath > public static void setupYarnClassPath(Configuration conf, Map > appMasterEnv) { >addToEnvironment( > appMasterEnv, > Environment.CLASSPATH.name(), > appMasterEnv.get(ENV_FLINK_CLASSPATH)); >String[] applicationClassPathEntries = conf.getStrings( > YarnConfiguration.YARN_APPLICATION_CLASSPATH, > YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH); >for (String c : applicationClassPathEntries) { > addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); >} > } > {code} > Maybe we should place the user jars behind yarn application classpath when > `org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is > set to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar". > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
[ https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969369#comment-16969369 ] Victor Wong commented on FLINK-14642: - [~sewen], sure I will ping you once PR is ready:) > Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values > > > Key: FLINK-14642 > URL: https://issues.apache.org/jira/browse/FLINK-14642 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, TupleSerializer and CaseCassSerializer do not support serialize > NULL values, which I think is acceptable. But not supporting copy NULL values > will cause the following codes to throw an exception, which I think is not > matched with users' expectations and prone to error. > *codes:* > {code:java} > stream.map(xxx).filter(_ != null).xxx //the return type of the map function > is Tuple and it may return null{code} > > *exception info:* > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) > > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) > {code} > > *suggestion:* > Can we make the `copy` method of TupleSerializer/CaseClassSerializer to > handle NULL values? e.g. > {code:java} > // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy > def copy(from: T): T = { > // handle NULL values. > if(from == null) { > return from > } > initArray() > var i = 0 > while (i < arity) { > fields(i) = > fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) > i += 1 > } > createInstance(fields) > } > {code} > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
[ https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-14642: Description: Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL values, which I think is acceptable. But not supporting copy NULL values will cause the following codes to throw an exception, which I think is not matched with users' expectations and prone to error. *codes:* {code:java} stream.map(xxx).filter(_ != null).xxx //the return type of the map function is Tuple and it may return null{code} *exception info:* {code:java} Caused by: java.lang.NullPointerException at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) {code} *suggestion:* Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle NULL values? e.g. {code:java} // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy def copy(from: T): T = { // handle NULL values. if(from == null) { return from } initArray() var i = 0 while (i < arity) { fields(i) = fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) i += 1 } createInstance(fields) } {code} was: Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL values, which I think is acceptable. But not supporting to copy NULL values will cause the following codes to throw an exception, which I think is not matched with users' expectations. *codes:* {code:java} stream.map(xxx).filter(_ != null).xxx //the return type of the map function is Tuple and it may return null{code} *exception info:* {code:java} Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) {code} *suggestion:* Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle NULL values? e.g. {code:java} // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy def copy(from: T): T = { // handle NULL values. if(from == null) { return from } initArray() var i = 0 while (i < arity) { fields(i) = fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) i += 1 } createInstance(fields) } {code} > Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values > > > Key: FLINK-14642 > URL: https://issues.apache.org/jira/browse/FLINK-14642 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.9.1 >Reporter: Victor Wong >Priority: Major > > Currently, TupleSerializer and CaseCassSerializer do not support serialize > NULL values, which I think is acceptable. But not supporting copy NULL values > will cause the following codes to throw an exception, which I think is not > matched with users' expectations and prone to error. > *codes:* > {code:java} > stream.map(xxx).filter(_ != null).xxx //the return type of the map function > is Tuple and it may return null{code} > > *exception info:* > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) > > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) > {code} > > *suggestion:* > Can we make the `copy` method of TupleSerializer/CaseClassSerializer to > handle NULL values? e.g. > {code:java} > // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy > def copy(from: T): T = { > // handle NULL values. > if(from == null) { > return from > } > initArray() > var i = 0 > while (i < arity) { > fields(i) = > fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) > i += 1 > } > createInstance(fields) > } > {code} > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
Victor Wong created FLINK-14642: --- Summary: Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values Key: FLINK-14642 URL: https://issues.apache.org/jira/browse/FLINK-14642 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.9.1 Reporter: Victor Wong Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL values, which I think is acceptable. But not supporting to copy NULL values will cause the following codes to throw an exception, which I think is not matched with users' expectations. *codes:* {code:java} stream.map(xxx).filter(_ != null).xxx //the return type of the map function is Tuple and it may return null{code} *exception info:* {code:java} Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) {code} *suggestion:* Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle NULL values? e.g. {code:java} // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy def copy(from: T): T = { // handle NULL values. if(from == null) { return from } initArray() var i = 0 while (i < arity) { fields(i) = fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) i += 1 } createInstance(fields) } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-12130) Apply command line options to configuration before installing security modules
[ https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong closed FLINK-12130. --- Resolution: Won't Fix > Apply command line options to configuration before installing security modules > -- > > Key: FLINK-12130 > URL: https://issues.apache.org/jira/browse/FLINK-12130 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Major > > Currently if the user configures Kerberos credentials through command line, > it won't work. > {code:java} > // flink run -m yarn-cluster -yD > security.kerberos.login.keytab=/path/to/keytab -yD > security.kerberos.login.principal=xxx /path/to/test.jar > {code} > Above command would cause security failure if you do not have a ticket cache > w/ kinit. > Maybe we could call > _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_ > before _SecurityUtils.install(new > SecurityConfiguration(cli.configuration));_ > Here is a demo patch: > [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-12648) Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()
[ https://issues.apache.org/jira/browse/FLINK-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong closed FLINK-12648. --- Resolution: Won't Fix > Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get() > - > > Key: FLINK-12648 > URL: https://issues.apache.org/jira/browse/FLINK-12648 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > I think there are some duplicated codes in > _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in > apache hadoop-common dependency. > We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, > org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes. > > Replace > {code:java} > // -- (2) get the Hadoop file system class for that scheme > final Class fsClass; > try { >fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, > hadoopConfig); > } > catch (IOException e) { >throw new UnsupportedFileSystemSchemeException( > "Hadoop File System abstraction does not support scheme '" + scheme > + "'. " + >"Either no file system implementation exists for that scheme, > " + >"or the relevant classes are missing from the classpath.", e); > } > // -- (3) instantiate the Hadoop file system > LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", > scheme, fsClass.getName()); > final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance(); > // -- (4) create the proper URI to initialize the file system > final URI initUri; > if (fsUri.getAuthority() != null) { >initUri = fsUri; > } > else { >LOG.debug("URI {} does not specify file system authority, trying to load > default authority (fs.defaultFS)"); >String configEntry = hadoopConfig.get("fs.defaultFS", null); >if (configEntry == null) { > // fs.default.name deprecated as of hadoop 2.2.0 - see > // > http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html > configEntry = hadoopConfig.get("fs.default.name", null); >} >if (LOG.isDebugEnabled()) { > LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry); >} >if (configEntry == null) { > throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + > "Hadoop configuration did not contain an entry for the default > file system ('fs.defaultFS')."); >} >else { > try { > initUri = URI.create(configEntry); > } > catch (IllegalArgumentException e) { > throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + >"The configuration contains an invalid file system default > name " + >"('fs.default.name' or 'fs.defaultFS'): " + configEntry); > } > if (initUri.getAuthority() == null) { > throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + >"Hadoop configuration for default file system > ('fs.default.name' or 'fs.defaultFS') " + >"contains no valid authority component (like hdfs namenode, S3 > host, etc)"); > } >} > } > // -- (5) configure the Hadoop file system > try { >hadoopFs.initialize(initUri, hadoopConfig); > } > catch (UnknownHostException e) { >String message = "The Hadoop file system's authority (" + > initUri.getAuthority() + > "), specified by either the file URI or the configuration, cannot be > resolved."; >throw new IOException(message, e); > } > {code} > with > {code:java} > final org.apache.hadoop.fs.FileSystem hadoopFs = > org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn
Victor Wong created FLINK-14626: --- Summary: User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn Key: FLINK-14626 URL: https://issues.apache.org/jira/browse/FLINK-14626 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.9.1 Reporter: Victor Wong Currently, the yarn application classpath is placed behind Flink classpath (including user jars), which will cause conflict if the user jar accidentally included hadoop dependencies. {code:java} // org.apache.flink.yarn.Utils#setupYarnClassPath public static void setupYarnClassPath(Configuration conf, Map appMasterEnv) { addToEnvironment( appMasterEnv, Environment.CLASSPATH.name(), appMasterEnv.get(ENV_FLINK_CLASSPATH)); String[] applicationClassPathEntries = conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH); for (String c : applicationClassPathEntries) { addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim()); } } {code} Maybe we should place the user jars behind yarn application classpath when `org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is set to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar". -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13749) Make Flink client respect classloading policy
[ https://issues.apache.org/jira/browse/FLINK-13749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967429#comment-16967429 ] Victor Wong commented on FLINK-13749: - Any progress on this? If the Assignee is busy, I'd love to help:) > Make Flink client respect classloading policy > - > > Key: FLINK-13749 > URL: https://issues.apache.org/jira/browse/FLINK-13749 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Runtime / REST >Reporter: Paul Lin >Assignee: Paul Lin >Priority: Minor > > Currently, Flink client does not respect the classloading policy and uses > hardcoded parent-first classloader, while the other components like > jobmanager and taskmanager use child-first classloader by default and respect > the classloading options. This makes the client more likely to have > dependency conflicts, especially after we removed the convenient hadoop > binaries (so users need to add hadoop classpath in the client classpath). > So I propose to make Flink client's (including cli and rest handler) > classloading behavior aligned with the other components. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13586) Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 1.8.1
[ https://issues.apache.org/jira/browse/FLINK-13586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16901096#comment-16901096 ] Victor Wong commented on FLINK-13586: - I think it would be helpful to attach [mail discussion|https://lists.apache.org/thread.html/56f137a4c0b90eb94a73b7a2ab95453282293cf9c81107e9bedc658a@%3Cuser.flink.apache.org%3E] here. A user ran into `java.lang.NoSuchMethodError` when trying to submit an application packaged with flink 1.8.1 to flink cluster 1.8.0. {code:java} ... Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Lorg/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel;Z)V at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:494) ...{code} > Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and > 1.8.1 > > > Key: FLINK-13586 > URL: https://issues.apache.org/jira/browse/FLINK-13586 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.1 >Reporter: Gaël Renoux >Priority: Major > > Method clean in org.apache.flink.api.java.ClosureCleaner received a new > parameter in Flink 1.8.1. This class is noted as internal, but is used in the > Kafka connectors (in > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase). > The Kafka connectors library is not provided by the server, and must be set > up as a dependency with compile scope (see > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#usage, > or the Maven project template). Any project using those connectors and built > with 1.8.0 cannot be deployed on a 1.8.1 Flink server, because it would > target the old method. > => This methods needs a fallback with the original two arguments (setting a > default value of RECURSIVE for the level argument). -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13586) Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 1.8.1
[ https://issues.apache.org/jira/browse/FLINK-13586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900142#comment-16900142 ] Victor Wong commented on FLINK-13586: - Hi Gaël, could you assign this ticket to me, so I can come up with a PR following the suggestion in the ticket. > Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and > 1.8.1 > > > Key: FLINK-13586 > URL: https://issues.apache.org/jira/browse/FLINK-13586 > Project: Flink > Issue Type: Bug >Affects Versions: 1.8.1 >Reporter: Gaël Renoux >Priority: Major > > Method clean in org.apache.flink.api.java.ClosureCleaner received a new > parameter in Flink 1.8.1. This class is noted as internal, but is used in the > Kafka connectors (in > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase). > The Kafka connectors library is not provided by the server, and must be set > up as a dependency with compile scope (see > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#usage, > or the Maven project template). Any project using those connectors and built > with 1.8.0 cannot be deployed on a 1.8.1 Flink server, because it would > target the old method. > => This methods needs a fallback with the original two arguments (setting a > default value of RECURSIVE for the level argument). -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-12646) Fix broken tests of RestClientTest
[ https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16859022#comment-16859022 ] Victor Wong commented on FLINK-12646: - I'd love to, [https://github.com/apache/flink/pull/8663] > Fix broken tests of RestClientTest > -- > > Key: FLINK-12646 > URL: https://issues.apache.org/jira/browse/FLINK-12646 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In > {code:java} > org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout > {code} > , we use a "unroutableIp" with a value of "10.255.255.1" for test. > But sometimes this IP is reachable in a private network of a company, which > is the case for me. As a result, this test failed with a following exception: > > {code:java} > java.lang.AssertionError: Expected: an instance of > org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: > Connection refused: /10.255.255.1:80> is a > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.junit.Assert.assertThat(Assert.java:956) at > org.junit.Assert.assertThat(Assert.java:923) at > org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) > ... > {code} > > > Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", > which is described as _Reserved for future use_ in > [wikipedia|https://en.wikipedia.org/wiki/Reserved_IP_addresses] > Or change the assertion? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12648) Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()
[ https://issues.apache.org/jira/browse/FLINK-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12648: Summary: Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get() (was: Load Hadoop file system via FileSystem.get()) > Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get() > - > > Key: FLINK-12648 > URL: https://issues.apache.org/jira/browse/FLINK-12648 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > > I think there are some duplicated codes in > _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in > apache hadoop-common dependency. > We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, > org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes. > > Replace > {code:java} > // -- (2) get the Hadoop file system class for that scheme > final Class fsClass; > try { >fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, > hadoopConfig); > } > catch (IOException e) { >throw new UnsupportedFileSystemSchemeException( > "Hadoop File System abstraction does not support scheme '" + scheme > + "'. " + >"Either no file system implementation exists for that scheme, > " + >"or the relevant classes are missing from the classpath.", e); > } > // -- (3) instantiate the Hadoop file system > LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", > scheme, fsClass.getName()); > final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance(); > // -- (4) create the proper URI to initialize the file system > final URI initUri; > if (fsUri.getAuthority() != null) { >initUri = fsUri; > } > else { >LOG.debug("URI {} does not specify file system authority, trying to load > default authority (fs.defaultFS)"); >String configEntry = hadoopConfig.get("fs.defaultFS", null); >if (configEntry == null) { > // fs.default.name deprecated as of hadoop 2.2.0 - see > // > http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html > configEntry = hadoopConfig.get("fs.default.name", null); >} >if (LOG.isDebugEnabled()) { > LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry); >} >if (configEntry == null) { > throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + > "Hadoop configuration did not contain an entry for the default > file system ('fs.defaultFS')."); >} >else { > try { > initUri = URI.create(configEntry); > } > catch (IllegalArgumentException e) { > throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + >"The configuration contains an invalid file system default > name " + >"('fs.default.name' or 'fs.defaultFS'): " + configEntry); > } > if (initUri.getAuthority() == null) { > throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + >"Hadoop configuration for default file system > ('fs.default.name' or 'fs.defaultFS') " + >"contains no valid authority component (like hdfs namenode, S3 > host, etc)"); > } >} > } > // -- (5) configure the Hadoop file system > try { >hadoopFs.initialize(initUri, hadoopConfig); > } > catch (UnknownHostException e) { >String message = "The Hadoop file system's authority (" + > initUri.getAuthority() + > "), specified by either the file URI or the configuration, cannot be > resolved."; >throw new IOException(message, e); > } > {code} > with > {code:java} > final org.apache.hadoop.fs.FileSystem hadoopFs = > org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12648) Load Hadoop file system via FileSystem.get()
Victor Wong created FLINK-12648: --- Summary: Load Hadoop file system via FileSystem.get() Key: FLINK-12648 URL: https://issues.apache.org/jira/browse/FLINK-12648 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: Victor Wong Assignee: Victor Wong I think there are some duplicated codes in _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in apache hadoop-common dependency. We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes. Replace {code:java} // -- (2) get the Hadoop file system class for that scheme final Class fsClass; try { fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig); } catch (IOException e) { throw new UnsupportedFileSystemSchemeException( "Hadoop File System abstraction does not support scheme '" + scheme + "'. " + "Either no file system implementation exists for that scheme, " + "or the relevant classes are missing from the classpath.", e); } // -- (3) instantiate the Hadoop file system LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName()); final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance(); // -- (4) create the proper URI to initialize the file system final URI initUri; if (fsUri.getAuthority() != null) { initUri = fsUri; } else { LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)"); String configEntry = hadoopConfig.get("fs.defaultFS", null); if (configEntry == null) { // fs.default.name deprecated as of hadoop 2.2.0 - see // http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html configEntry = hadoopConfig.get("fs.default.name", null); } if (LOG.isDebugEnabled()) { LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry); } if (configEntry == null) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS')."); } else { try { initUri = URI.create(configEntry); } catch (IllegalArgumentException e) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "The configuration contains an invalid file system default name " + "('fs.default.name' or 'fs.defaultFS'): " + configEntry); } if (initUri.getAuthority() == null) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " + "contains no valid authority component (like hdfs namenode, S3 host, etc)"); } } } // -- (5) configure the Hadoop file system try { hadoopFs.initialize(initUri, hadoopConfig); } catch (UnknownHostException e) { String message = "The Hadoop file system's authority (" + initUri.getAuthority() + "), specified by either the file URI or the configuration, cannot be resolved."; throw new IOException(message, e); } {code} with {code:java} final org.apache.hadoop.fs.FileSystem hadoopFs = org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12646) Fix broken tests of RestClientTest
[ https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12646: Description: In {code:java} org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout {code} , we use a "unroutableIp" with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: {code:java} java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... {code} Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as _Reserved for future use_ in [wikipedia|https://en.wikipedia.org/wiki/Reserved_IP_addresses] Or change the assertion? was: In {code:java} org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout {code} , we use a "unroutableIp" with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: {code:java} java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... {code} Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as _Reserved for future use_ in [wikipedia|[https://en.wikipedia.org/wiki/Reserved_IP_addresses]] Or change the assertion? > Fix broken tests of RestClientTest > -- > > Key: FLINK-12646 > URL: https://issues.apache.org/jira/browse/FLINK-12646 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > > In > {code:java} > org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout > {code} > , we use a "unroutableIp" with a value of "10.255.255.1" for test. > But sometimes this IP is reachable in a private network of a company, which > is the case for me. As a result, this test failed with a following exception: > > {code:java} > java.lang.AssertionError: Expected: an instance of > org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: > Connection refused: /10.255.255.1:80> is a > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.junit.Assert.assertThat(Assert.java:956) at > org.junit.Assert.assertThat(Assert.java:923) at > org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) > ... > {code} > > > Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", > which is described as _Reserved for future use_ in > [wikipedia|https://en.wikipedia.org/wiki/Reserved_IP_addresses] > Or change the assertion? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12646) Fix broken tests of RestClientTest
[ https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12646: Description: In {code:java} org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout {code} , we use a "unroutableIp" with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: {code:java} java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... {code} Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as _Reserved for future use_ in [wikipedia|[https://en.wikipedia.org/wiki/Reserved_IP_addresses]] Or change the assertion? was: In {code:java} org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout {code} , we use a "unroutableIp" with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: {code:java} java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... {code} Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as _Reserved for future use_ in ([https://en.wikipedia.org/wiki/Reserved_IP_addresses]) Or change the assertion? > Fix broken tests of RestClientTest > -- > > Key: FLINK-12646 > URL: https://issues.apache.org/jira/browse/FLINK-12646 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > > In > {code:java} > org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout > {code} > , we use a "unroutableIp" with a value of "10.255.255.1" for test. > But sometimes this IP is reachable in a private network of a company, which > is the case for me. As a result, this test failed with a following exception: > > {code:java} > java.lang.AssertionError: Expected: an instance of > org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: > Connection refused: /10.255.255.1:80> is a > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.junit.Assert.assertThat(Assert.java:956) at > org.junit.Assert.assertThat(Assert.java:923) at > org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) > ... > {code} > > > Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", > which is described as _Reserved for future use_ in > [wikipedia|[https://en.wikipedia.org/wiki/Reserved_IP_addresses]] > Or change the assertion? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12646) Fix broken tests of RestClientTest
[ https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12646: Description: In {code:java} org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout {code} , we use a "unroutableIp" with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: {code:java} java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... {code} Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as _Reserved for future use_ in ([https://en.wikipedia.org/wiki/Reserved_IP_addresses]) Or change the assertion? was: In {code:java} // code placeholder {code} `org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use a `unroutableIp` with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: ``` java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... ``` Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as > Reserved for future use. in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses]) Or change the assertion? > Fix broken tests of RestClientTest > -- > > Key: FLINK-12646 > URL: https://issues.apache.org/jira/browse/FLINK-12646 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > > In > {code:java} > org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout > {code} > , we use a "unroutableIp" with a value of "10.255.255.1" for test. > But sometimes this IP is reachable in a private network of a company, which > is the case for me. As a result, this test failed with a following exception: > > {code:java} > java.lang.AssertionError: Expected: an instance of > org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: > Connection refused: /10.255.255.1:80> is a > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.junit.Assert.assertThat(Assert.java:956) at > org.junit.Assert.assertThat(Assert.java:923) at > org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) > ... > {code} > > > Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", > which is described as _Reserved for future use_ in > ([https://en.wikipedia.org/wiki/Reserved_IP_addresses]) > Or change the assertion? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12646) Fix broken tests of RestClientTest
[ https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12646: Description: In {code:java} // code placeholder {code} `org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use a `unroutableIp` with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: ``` java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... ``` Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as > Reserved for future use. in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses]) Or change the assertion? was: In `org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use a `unroutableIp` with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: ``` java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... ``` Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as > Reserved for future use. in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses]) Or change the assertion? > Fix broken tests of RestClientTest > -- > > Key: FLINK-12646 > URL: https://issues.apache.org/jira/browse/FLINK-12646 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > > In > {code:java} > // code placeholder > {code} > `org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use > a `unroutableIp` with a value of "10.255.255.1" for test. > But sometimes this IP is reachable in a private network of a company, which > is the case for me. As a result, this test failed with a following exception: > ``` > java.lang.AssertionError: > Expected: an instance of > org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException > but: > Connection refused: /10.255.255.1:80> is a > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.junit.Assert.assertThat(Assert.java:956) > at org.junit.Assert.assertThat(Assert.java:923) > at > org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) > ... > ``` > > Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", > which is described as > > Reserved for future use. > in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses]) > Or change the assertion? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12646) Fix broken tests of RestClientTest
Victor Wong created FLINK-12646: --- Summary: Fix broken tests of RestClientTest Key: FLINK-12646 URL: https://issues.apache.org/jira/browse/FLINK-12646 Project: Flink Issue Type: Bug Components: Runtime / REST Reporter: Victor Wong Assignee: Victor Wong In `org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use a `unroutableIp` with a value of "10.255.255.1" for test. But sometimes this IP is reachable in a private network of a company, which is the case for me. As a result, this test failed with a following exception: ``` java.lang.AssertionError: Expected: an instance of org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: is a org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.Assert.assertThat(Assert.java:923) at org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76) ... ``` Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", which is described as > Reserved for future use. in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses]) Or change the assertion? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jars
[ https://issues.apache.org/jira/browse/FLINK-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12459: Description: When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think the user jars should come after all flink libs&jars in runtime classpath, including "flink.jar". But actually it's not: org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster !image-2019-05-09-15-04-36-360.png! I'm not sure if it's an expected behavior, because if a user forgets to mark Flink dependencies as "provided", it causes conflicts in detached mode. was: When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think the user jars should come after all flink libs&jars in runtime classpath, including "flink.jar". But actually it's not: org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster !image-2019-05-09-15-04-36-360.png! I'm not sure if it's an expected behavior, because if a user forgets to mark Flink dependencies as "provided", it causes conflicts in detached mode. Can we optimize it like this: [https://github.com/apache/flink/commit/86f264d76b3cfd89346f8c5ab2b8a9e99600aaee] > YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of > classpath between user jars and flink jars > -- > > Key: FLINK-12459 > URL: https://issues.apache.org/jira/browse/FLINK-12459 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Labels: pull-request-available > Attachments: image-2019-05-09-15-04-36-360.png > > Time Spent: 10m > Remaining Estimate: 0h > > When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think > the user jars should come after all flink libs&jars in runtime classpath, > including "flink.jar". > But actually it's not: > org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster > !image-2019-05-09-15-04-36-360.png! > I'm not sure if it's an expected behavior, because if a user forgets to mark > Flink dependencies as "provided", it causes conflicts in detached mode. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jars
[ https://issues.apache.org/jira/browse/FLINK-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12459: Description: When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think the user jars should come after all flink libs&jars in runtime classpath, including "flink.jar". But actually it's not: org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster !image-2019-05-09-15-04-36-360.png! I'm not sure if it's an expected behavior, because if a user forgets to mark Flink dependencies as "provided", it causes conflicts in detached mode. Can we optimize it like this: [https://github.com/apache/flink/commit/86f264d76b3cfd89346f8c5ab2b8a9e99600aaee] was: When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think the user jars should come after all flink libs&jars in runtime classpath, including "flink.jar". But actually it's not: org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster !image-2019-05-09-15-04-36-360.png! I'm not sure if it's an expected behavior, because if a user forgets to mark Flink dependencies as "provided", it causes conflicts in detached mode. > YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of > classpath between user jars and flink jars > -- > > Key: FLINK-12459 > URL: https://issues.apache.org/jira/browse/FLINK-12459 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Attachments: image-2019-05-09-15-04-36-360.png > > > When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think > the user jars should come after all flink libs&jars in runtime classpath, > including "flink.jar". > But actually it's not: > org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster > !image-2019-05-09-15-04-36-360.png! > I'm not sure if it's an expected behavior, because if a user forgets to mark > Flink dependencies as "provided", it causes conflicts in detached mode. > Can we optimize it like this: > [https://github.com/apache/flink/commit/86f264d76b3cfd89346f8c5ab2b8a9e99600aaee] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12472) Support setting attemptFailuresValidityInterval of jobs on Yarn
Victor Wong created FLINK-12472: --- Summary: Support setting attemptFailuresValidityInterval of jobs on Yarn Key: FLINK-12472 URL: https://issues.apache.org/jira/browse/FLINK-12472 Project: Flink Issue Type: Improvement Components: Deployment / YARN Reporter: Victor Wong Assignee: Victor Wong According to the documentation of [Yarn|http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.html], a yarn application can set a _attemptFailuresValidityInterval_ to reset application attempts. "attemptFailuresValidityInterval. _The default value is -1. when attemptFailuresValidityInterval in milliseconds is set to > 0, the failure number will no take failures which happen out of the validityInterval into failure count. If failure count reaches to maxAppAttempts, the application will be failed."_ We can make use of this feature to make Flink jobs on Yarn to be more long-running. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12468) Unregister application from the YARN Resource Manager with a valid appTrackingUrl
Victor Wong created FLINK-12468: --- Summary: Unregister application from the YARN Resource Manager with a valid appTrackingUrl Key: FLINK-12468 URL: https://issues.apache.org/jira/browse/FLINK-12468 Project: Flink Issue Type: Improvement Components: Deployment / YARN Reporter: Victor Wong Assignee: Victor Wong Attachments: image-2019-05-09-18-06-06-954.png, image-2019-05-09-18-07-52-725.png Currently when a Flink job on yarn finished, it's tracking URL is not valid. As a result, we can not jump to Flink history server directly from Yarn. !image-2019-05-09-18-06-06-954.png! We can provide a valid appTrackingUrl when unregister from Yarn. _org.apache.flink.yarn.YarnResourceManager#internalDeregisterApplication_ _!image-2019-05-09-18-07-52-725.png!_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jar
[ https://issues.apache.org/jira/browse/FLINK-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12459: Description: When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think the user jars should come after all flink libs&jars in runtime classpath, including "flink.jar". But actually it's not: org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster !image-2019-05-09-15-04-36-360.png! I'm not sure if it's an expected behavior, because if a user forgets to mark Flink dependencies as "provided", it causes conflicts in detached mode. was: When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think the user jars should come after all flink libs&jars, including "flink.jar". But actually it's not: !image-2019-05-09-15-04-36-360.png! I'm not sure if it's an expected behavior, because if a user forgets to mark Flink dependencies as "provided", it causes conflicts. > YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of > classpath between user jars and flink jar > - > > Key: FLINK-12459 > URL: https://issues.apache.org/jira/browse/FLINK-12459 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Attachments: image-2019-05-09-15-04-36-360.png > > > When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think > the user jars should come after all flink libs&jars in runtime classpath, > including "flink.jar". > But actually it's not: > org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster > !image-2019-05-09-15-04-36-360.png! > I'm not sure if it's an expected behavior, because if a user forgets to mark > Flink dependencies as "provided", it causes conflicts in detached mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jars
[ https://issues.apache.org/jira/browse/FLINK-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Wong updated FLINK-12459: Summary: YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jars (was: YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jar) > YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of > classpath between user jars and flink jars > -- > > Key: FLINK-12459 > URL: https://issues.apache.org/jira/browse/FLINK-12459 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Minor > Attachments: image-2019-05-09-15-04-36-360.png > > > When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think > the user jars should come after all flink libs&jars in runtime classpath, > including "flink.jar". > But actually it's not: > org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster > !image-2019-05-09-15-04-36-360.png! > I'm not sure if it's an expected behavior, because if a user forgets to mark > Flink dependencies as "provided", it causes conflicts in detached mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jar
Victor Wong created FLINK-12459: --- Summary: YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jar Key: FLINK-12459 URL: https://issues.apache.org/jira/browse/FLINK-12459 Project: Flink Issue Type: Improvement Components: Command Line Client Reporter: Victor Wong Assignee: Victor Wong Attachments: image-2019-05-09-15-04-36-360.png When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think the user jars should come after all flink libs&jars, including "flink.jar". But actually it's not: !image-2019-05-09-15-04-36-360.png! I'm not sure if it's an expected behavior, because if a user forgets to mark Flink dependencies as "provided", it causes conflicts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12130) Apply command line options to configuration before installing security modules
[ https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817050#comment-16817050 ] Victor Wong edited comment on FLINK-12130 at 4/19/19 1:20 AM: -- Hi [~aljoscha], I have created a PR with your advice, https://github.com/apache/flink/pull/8166 I tried to make the patch as simple as possible, but it seems like I "failed", because I have to change the behavior of `org.apache.flink.client.cli.CliFrontend#parseParameters` to return the correct `CommandLine` for different actions. Maybe there is a better way, looking forward to your advice. was (Author: victor-wong): Hi [~aljoscha], I have created a PR with your advice, [https://github.com/apache/flink/pull/8166|https://github.com/apache/flink/pull/8166.] I tried to make the patch as simple as possible, but it seems like I "failed", because I have to change the behavior of `org.apache.flink.client.cli.CliFrontend#parseParameters` to return the correct `CommandLine` for different actions. Maybe there is a better way, looking forward to your advice. > Apply command line options to configuration before installing security modules > -- > > Key: FLINK-12130 > URL: https://issues.apache.org/jira/browse/FLINK-12130 > Project: Flink > Issue Type: Improvement > Components: Command Line Client >Reporter: Victor Wong >Assignee: Victor Wong >Priority: Major > > Currently if the user configures Kerberos credentials through command line, > it won't work. > {code:java} > // flink run -m yarn-cluster -yD > security.kerberos.login.keytab=/path/to/keytab -yD > security.kerberos.login.principal=xxx /path/to/test.jar > {code} > Above command would cause security failure if you do not have a ticket cache > w/ kinit. > Maybe we could call > _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_ > before _SecurityUtils.install(new > SecurityConfiguration(cli.configuration));_ > Here is a demo patch: > [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986] -- This message was sent by Atlassian JIRA (v7.6.3#76005)