[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules

2020-12-06 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17244953#comment-17244953
 ] 

Victor Wong commented on FLINK-12130:
-

[~lingyaKK], thanks for your time! [~aljoscha], could you help review and merge 
this, https://github.com/apache/flink/pull/14271


> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules

2020-12-01 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241404#comment-17241404
 ] 

Victor Wong commented on FLINK-12130:
-

[~lingyaKK],[~zhouqi], hi, I come up with a new PR, 
https://github.com/apache/flink/pull/14271, could you help me review this.

> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules

2020-11-12 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17231204#comment-17231204
 ] 

Victor Wong commented on FLINK-12130:
-

[~lingyaKK][~zhouqi] Very sorry, I haven't worked on this recently:( I plan to 
work on this next week, if anyone wants to contribute a new PR here, it would 
be great!

> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12130) Apply command line options to configuration before installing security modules

2020-08-24 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183280#comment-17183280
 ] 

Victor Wong commented on FLINK-12130:
-

[~aljoscha] I'd like to help, what about a new PR based on master branch?

> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16884) The "rest.port" configuration is always "0" with flink-yarn jobs

2020-03-31 Thread Victor Wong (Jira)
Victor Wong created FLINK-16884:
---

 Summary: The "rest.port" configuration is always "0" with 
flink-yarn jobs
 Key: FLINK-16884
 URL: https://issues.apache.org/jira/browse/FLINK-16884
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.10.0
Reporter: Victor Wong


We would like to get the "rest.port" value with flink rest api 
["/jobmanager/config" | 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html],
 but it is always "0" in the returned json content, which is not the real port.

Since the "rest.port" is dynamically assigned, we could update the value after 
the `WebMonitorEndpoint` is started.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-03-19 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17063064#comment-17063064
 ] 

Victor Wong commented on FLINK-15447:
-

Currently, we can solve this issue through "env.java.opts: 
-Djava.io.tmpdir=./tmp", closing this issue now.

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-03-19 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong closed FLINK-15447.
---
Resolution: Not A Problem

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-03-06 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17053156#comment-17053156
 ] 

Victor Wong commented on FLINK-15447:
-

Hi, [~trohrmann], since this issue is still valid against the current master 
branch, I came up with a PR to demonstrate my intended change, which was 
implemented mainly based on previous discussions. Please give me some advice if 
available. 

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16376) Using consistent method to get Yarn application directory

2020-03-05 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17052688#comment-17052688
 ] 

Victor Wong commented on FLINK-16376:
-

Hi, [~trohrmann], the PR is attached.

> Using consistent method to get Yarn application directory
> -
>
> Key: FLINK-16376
> URL: https://issues.apache.org/jira/browse/FLINK-16376
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The Yarn application directory of Flink is "/user/{user.name}/.flink", but 
> this logic is separated in different places.
> 1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir
> {code:java}
>   private Path getYarnFilesDir(final ApplicationId appId) throws 
> IOException {
>   final FileSystem fileSystem = FileSystem.get(yarnConfiguration);
>   final Path homeDir = fileSystem.getHomeDirectory();
>   return new Path(homeDir, ".flink/" + appId + '/');
>   }
> {code}
> 2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote
> {code:java}
>   // copy resource to HDFS
>   String suffix =
>   ".flink/"
>   + appId
>   + (relativeTargetPath.isEmpty() ? "" : "/" + 
> relativeTargetPath)
>   + "/" + localSrcPath.getName();
>   Path dst = new Path(homedir, suffix);
> {code}
> We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and 
> use this method to get Yarn application directory in all the other places.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16376) Using consistent method to get Yarn application directory

2020-03-02 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-16376:

Description: 
The Yarn application directory of Flink is "/user/{user.name}/.flink", but this 
logic is separated in different places.

1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir

{code:java}
private Path getYarnFilesDir(final ApplicationId appId) throws 
IOException {
final FileSystem fileSystem = FileSystem.get(yarnConfiguration);
final Path homeDir = fileSystem.getHomeDirectory();
return new Path(homeDir, ".flink/" + appId + '/');
}
{code}

2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote


{code:java}
// copy resource to HDFS
String suffix =
".flink/"
+ appId
+ (relativeTargetPath.isEmpty() ? "" : "/" + 
relativeTargetPath)
+ "/" + localSrcPath.getName();

Path dst = new Path(homedir, suffix);
{code}

We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and 
use this method to get Yarn application directory in all the other places.




  was:
The Yarn application directory of Flink is "/user/{user.name}/.flink", but this 
logic is separated in different places.

1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir

{code:java}
private Path getYarnFilesDir(final ApplicationId appId) throws 
IOException {
final FileSystem fileSystem = FileSystem.get(yarnConfiguration);
final Path homeDir = fileSystem.getHomeDirectory();
return *new Path(homeDir, ".flink/" + appId + '/');*
}
{code}

2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote


{code:java}
// copy resource to HDFS
String suffix =
*".flink/"*
*+ appId*
+ (relativeTargetPath.isEmpty() ? "" : "/" + 
relativeTargetPath)
+ "/" + localSrcPath.getName();

Path dst = new Path(homedir, suffix);
{code}

We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and 
use this method to get Yarn application directory in all the other places.





> Using consistent method to get Yarn application directory
> -
>
> Key: FLINK-16376
> URL: https://issues.apache.org/jira/browse/FLINK-16376
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.10.0
>Reporter: Victor Wong
>Priority: Major
>
> The Yarn application directory of Flink is "/user/{user.name}/.flink", but 
> this logic is separated in different places.
> 1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir
> {code:java}
>   private Path getYarnFilesDir(final ApplicationId appId) throws 
> IOException {
>   final FileSystem fileSystem = FileSystem.get(yarnConfiguration);
>   final Path homeDir = fileSystem.getHomeDirectory();
>   return new Path(homeDir, ".flink/" + appId + '/');
>   }
> {code}
> 2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote
> {code:java}
>   // copy resource to HDFS
>   String suffix =
>   ".flink/"
>   + appId
>   + (relativeTargetPath.isEmpty() ? "" : "/" + 
> relativeTargetPath)
>   + "/" + localSrcPath.getName();
>   Path dst = new Path(homedir, suffix);
> {code}
> We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and 
> use this method to get Yarn application directory in all the other places.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16376) Using consistent method to get Yarn application directory

2020-03-02 Thread Victor Wong (Jira)
Victor Wong created FLINK-16376:
---

 Summary: Using consistent method to get Yarn application directory
 Key: FLINK-16376
 URL: https://issues.apache.org/jira/browse/FLINK-16376
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.10.0
Reporter: Victor Wong


The Yarn application directory of Flink is "/user/{user.name}/.flink", but this 
logic is separated in different places.

1. org.apache.flink.yarn.YarnClusterDescriptor#getYarnFilesDir

{code:java}
private Path getYarnFilesDir(final ApplicationId appId) throws 
IOException {
final FileSystem fileSystem = FileSystem.get(yarnConfiguration);
final Path homeDir = fileSystem.getHomeDirectory();
return *new Path(homeDir, ".flink/" + appId + '/');*
}
{code}

2. org.apache.flink.yarn.Utils#uploadLocalFileToRemote


{code:java}
// copy resource to HDFS
String suffix =
*".flink/"*
*+ appId*
+ (relativeTargetPath.isEmpty() ? "" : "/" + 
relativeTargetPath)
+ "/" + localSrcPath.getName();

Path dst = new Path(homedir, suffix);
{code}

We can extract `getYarnFilesDir` method to `org.apache.flink.yarn.Utils`, and 
use this method to get Yarn application directory in all the other places.






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15992) Incorrect classloader when finding TableFactory

2020-02-21 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17041731#comment-17041731
 ] 

Victor Wong commented on FLINK-15992:
-

[~jark] ok, sorry for the inconvenience caused by this PR:(

> Incorrect classloader when finding TableFactory
> ---
>
> Key: FLINK-15992
> URL: https://issues.apache.org/jira/browse/FLINK-15992
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / API, Tests
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> *Background*
> As a streaming service maintainer in our company, to ensure our users depend 
> on the correct version of Kafka and flink-kafka, we add 
> "flink-connector-kafka" into "fink-dist/lib" directory.
> *Problem*
> When submitting flink-sql jobs, we encountered below exceptions:
> {code:java}
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
> {code}
> But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in 
> "META-INF/services/org.apache.flink.table.factories.TableFactory", which 
> implements DeserializationSchemaFactory.
> *Debug*
> We find that it was caused by this:
> {code:java}
> // 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema
>   final SerializationSchemaFactory formatFactory = 
> TableFactoryService.find(
>   SerializationSchemaFactory.class,
>   properties,
>   this.getClass().getClassLoader());
> {code}
> It uses `this.getClass().getClassLoader()`, which will be 
> BootStrapClassLoader of flink.
> I think we could replace it with 
> `Thread.currentThread().getContextClassLoader()` to solve this.
> There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15992) Incorrect classloader when finding TableFactory

2020-02-11 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034961#comment-17034961
 ] 

Victor Wong commented on FLINK-15992:
-

[~jark], thanks Jark, I will open a PR for this and ping you later.

> Incorrect classloader when finding TableFactory
> ---
>
> Key: FLINK-15992
> URL: https://issues.apache.org/jira/browse/FLINK-15992
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Victor Wong
>Priority: Major
>
> *Background*
> As a streaming service maintainer in our company, to ensure our users depend 
> on the correct version of Kafka and flink-kafka, we add 
> "flink-connector-kafka" into "fink-dist/lib" directory.
> *Problem*
> When submitting flink-sql jobs, we encountered below exceptions:
> {code:java}
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
> {code}
> But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in 
> "META-INF/services/org.apache.flink.table.factories.TableFactory", which 
> implements DeserializationSchemaFactory.
> *Debug*
> We find that it was caused by this:
> {code:java}
> // 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema
>   final SerializationSchemaFactory formatFactory = 
> TableFactoryService.find(
>   SerializationSchemaFactory.class,
>   properties,
>   this.getClass().getClassLoader());
> {code}
> It uses `this.getClass().getClassLoader()`, which will be 
> BootStrapClassLoader of flink.
> I think we could replace it with 
> `Thread.currentThread().getContextClassLoader()` to solve this.
> There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15992) Incorrect classloader when finding TableFactory

2020-02-11 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034349#comment-17034349
 ] 

Victor Wong commented on FLINK-15992:
-

That's right, we can just remove the this.getClass().getClassLoader() 
parameter.(y)

As the second question, the required TableFactory was configured in the user 
jar, so the BootStrapClassLoader could not find the class.

> Incorrect classloader when finding TableFactory
> ---
>
> Key: FLINK-15992
> URL: https://issues.apache.org/jira/browse/FLINK-15992
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Victor Wong
>Priority: Major
>
> *Background*
> As a streaming service maintainer in our company, to ensure our users depend 
> on the correct version of Kafka and flink-kafka, we add 
> "flink-connector-kafka" into "fink-dist/lib" directory.
> *Problem*
> When submitting flink-sql jobs, we encountered below exceptions:
> {code:java}
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
> {code}
> But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in 
> "META-INF/services/org.apache.flink.table.factories.TableFactory", which 
> implements DeserializationSchemaFactory.
> *Debug*
> We find that it was caused by this:
> {code:java}
> // 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema
>   final SerializationSchemaFactory formatFactory = 
> TableFactoryService.find(
>   SerializationSchemaFactory.class,
>   properties,
>   this.getClass().getClassLoader());
> {code}
> It uses `this.getClass().getClassLoader()`, which will be 
> BootStrapClassLoader of flink.
> I think we could replace it with 
> `Thread.currentThread().getContextClassLoader()` to solve this.
> There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15992) Incorrect classloader when finding TableFactory

2020-02-11 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15992:

Description: 
*Background*

As a streaming service maintainer in our company, to ensure our users depend on 
the correct version of Kafka and flink-kafka, we add "flink-connector-kafka" 
into "fink-dist/lib" directory.

*Problem*

When submitting flink-sql jobs, we encountered below exceptions:
{code:java}
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.
{code}
But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in 
"META-INF/services/org.apache.flink.table.factories.TableFactory", which 
implements DeserializationSchemaFactory.

*Debug*

We find that it was caused by this:

{code:java}
// 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema

final SerializationSchemaFactory formatFactory = 
TableFactoryService.find(
SerializationSchemaFactory.class,
properties,
this.getClass().getClassLoader());
{code}

It uses `this.getClass().getClassLoader()`, which will be BootStrapClassLoader 
of flink.
I think we could replace it with 
`Thread.currentThread().getContextClassLoader()` to solve this.

There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552



  was:
*Background*

As a streaming service maintainer in our company, to ensure our users depend on 
the correct version of Kafka and flink-kafka, we add "flink-connector-kafka" 
into "fink-dist/lib" directory.

*Problem*

When submitting flink-sql jobs, we encountered below exceptions:
{code:java}
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.
{code}

*Debug*

We find that it was caused by this:

{code:java}
// 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema

final SerializationSchemaFactory formatFactory = 
TableFactoryService.find(
SerializationSchemaFactory.class,
properties,
this.getClass().getClassLoader());
{code}

It uses `this.getClass().getClassLoader()`, which will be BootStrapClassLoader 
of fink.
We could replace it with `Thread.currentThread().getContextClassLoader()` to 
solve this.

There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552




> Incorrect classloader when finding TableFactory
> ---
>
> Key: FLINK-15992
> URL: https://issues.apache.org/jira/browse/FLINK-15992
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Victor Wong
>Priority: Major
>
> *Background*
> As a streaming service maintainer in our company, to ensure our users depend 
> on the correct version of Kafka and flink-kafka, we add 
> "flink-connector-kafka" into "fink-dist/lib" directory.
> *Problem*
> When submitting flink-sql jobs, we encountered below exceptions:
> {code:java}
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
> {code}
> But we have add "org.apache.flink.formats.json.JsonRowFormatFactory" in 
> "META-INF/services/org.apache.flink.table.factories.TableFactory", which 
> implements DeserializationSchemaFactory.
> *Debug*
> We find that it was caused by this:
> {code:java}
> // 
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema
>   final SerializationSchemaFactory formatFactory = 
> TableFactoryService.find(
>   SerializationSchemaFactory.class,
>   properties,
>   this.getClass().getClassLoader());
> {code}
> It uses `this.getClass().getClassLoader()`, which will be 
> BootStrapClassLoader of flink.
> I think we could replace it with 
> `Thread.currentThread().getContextClassLoader()` to solve this.
> There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15992) Incorrect classloader when finding TableFactory

2020-02-11 Thread Victor Wong (Jira)
Victor Wong created FLINK-15992:
---

 Summary: Incorrect classloader when finding TableFactory
 Key: FLINK-15992
 URL: https://issues.apache.org/jira/browse/FLINK-15992
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Victor Wong


*Background*

As a streaming service maintainer in our company, to ensure our users depend on 
the correct version of Kafka and flink-kafka, we add "flink-connector-kafka" 
into "fink-dist/lib" directory.

*Problem*

When submitting flink-sql jobs, we encountered below exceptions:
{code:java}
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.
{code}

*Debug*

We find that it was caused by this:

{code:java}
// 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase#getSerializationSchema

final SerializationSchemaFactory formatFactory = 
TableFactoryService.find(
SerializationSchemaFactory.class,
properties,
this.getClass().getClassLoader());
{code}

It uses `this.getClass().getClassLoader()`, which will be BootStrapClassLoader 
of fink.
We could replace it with `Thread.currentThread().getContextClassLoader()` to 
solve this.

There is a related issue: https://issues.apache.org/jira/browse/FLINK-15552





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-02-02 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028644#comment-17028644
 ] 

Victor Wong commented on FLINK-15447:
-

[~trohrmann], thanks for your attention. 

_I am wondering whether you would like to configure the system property 
{{java.io.tmpdir}} to point towards {{./tmp}} or to only change Flink's temp 
directories._

_---_

The former, configure the system property java.io.tmpdir.

 

_If not, then we would need to adapt the java command which starts the Flink 
processes._

_---_

I think this is the best choice, which has _the benefit that libraries, relying 
on {{java.io.tmpdir}}, will not write their temporary data to {{/tmp}}, too._

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15449) Retain lost task managers on Flink UI

2020-02-02 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028639#comment-17028639
 ] 

Victor Wong commented on FLINK-15449:
-

[~chesnay], as suggested by [~fly_in_gis], we could _use the schema 
"http://\{RM_Address:PORT}/node/containerlogs/\{container_id}/\{user}; to 
construct the log url,_ so I think it's feasible to retain a log URL to lost 
TMs.

I agree that it would add additional complexity, but it's inconvenient to debug 
lost TMs right now, I think it's worth a shot to solve this. 

> Retain lost task managers on Flink UI
> -
>
> Key: FLINK-15449
> URL: https://issues.apache.org/jira/browse/FLINK-15449
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> With Flink on Yarn, sometimes our TaskManager was killed because of OOM or 
> heartbeat timeout or whatever reasons, it's not convenient to check out the 
> logs of the lost TaskManger.
> Can we retain the lost task managers on Flink UI, and provide the log service 
> through Yarn (we can redirect the URL of log/stdout to Yarn container 
> log/stdout)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-01-23 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17021862#comment-17021862
 ] 

Victor Wong commented on FLINK-15447:
-

[~fly_in_gis], it makes sense to make "java.io.tmpdir" configurable, we could 
add a new YarnOption configuration to achieve this. If this issue would be 
assigned to me, you could help me to review my PR.

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values

2020-01-23 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17021859#comment-17021859
 ] 

Victor Wong commented on FLINK-14642:
-

[~liuyufei] if I understand correctly, the Tuple and CaseClass themself should 
not be NULL, but their fields could be NULL.

> Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
> 
>
> Key: FLINK-14642
> URL: https://issues.apache.org/jira/browse/FLINK-14642
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently, TupleSerializer and CaseCassSerializer do not support serialize 
> NULL values, which I think is acceptable. But not supporting copy NULL values 
> will cause the following codes to throw an exception, which I think is not 
> matched with users' expectations and prone to error.
> *codes:*
> {code:java}
> stream.map(xxx).filter(_ != null).xxx //the return type of the map function 
> is Tuple and it may return null{code}
>  
> *exception info:*
>  
> {code:java}
> Caused by: java.lang.NullPointerException 
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
>  
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> {code}
>  
> *suggestion:*
> Can we make the `copy` method of TupleSerializer/CaseClassSerializer to 
> handle NULL values? e.g.
> {code:java}
> // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
> def copy(from: T): T = {
>   // handle NULL values.
>   if(from == null) {
> return from
>   }
>   initArray()
>   var i = 0
>   while (i < arity) {
> fields(i) = 
> fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
> i += 1
>   }
>   createInstance(fields)
> }
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-01-23 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17021858#comment-17021858
 ] 

Victor Wong commented on FLINK-15447:
-

[~rongr] updated this issue based on your suggestion. Could you assign it to 
me? Thanks!

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-01-23 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15447:

Description: 
*#Background*

Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to 
the default value, which is "/tmp".  

Sometimes we ran into exceptions caused by a full "/tmp" directory, which would 
not be cleaned automatically after applications finished.

 

#*Goal*

quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]

_1) Tasks can utilize all disks when using tmp_
 _2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
is done._

 

#*Suggestion*

I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
something similar. "PWD" will be replaced with the true working 
directory of JM/TM by Yarn, which will be cleaned automatically.

 

  was:
# *Background*

Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to 
the default value, which is "/tmp".  

Sometimes we ran into exceptions caused by a full "/tmp" directory, which would 
not be cleaned automatically after applications finished.

 

*# Goal* 

1) Tasks can utilize all disks when using tmp
2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
is done.

I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
something similar. "PWD" will be replaced with the true working 
directory of JM/TM by Yarn, which will be cleaned automatically.

 


> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> *#Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> #*Goal*
> quoted from: [HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]
> _1) Tasks can utilize all disks when using tmp_
>  _2) Any undeleted tmp files will be deleted by the tasktracker when 
> task(job?) is done._
>  
> #*Suggestion*
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-01-23 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15447:

Description: 
# *Background*

Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to 
the default value, which is "/tmp".  

Sometimes we ran into exceptions caused by a full "/tmp" directory, which would 
not be cleaned automatically after applications finished.

 

*# Goal* 

1) Tasks can utilize all disks when using tmp
2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
is done.

I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
something similar. "PWD" will be replaced with the true working 
directory of JM/TM by Yarn, which will be cleaned automatically.

 

  was:
Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to 
the default value, which is "/tmp". 

 

Sometimes we ran into exceptions caused by a full "/tmp" directory, which would 
not be cleaned automatically after applications finished.

I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
something similar. "PWD" will be replaced with the true working 
directory of JM/TM by Yarn, which will be cleaned automatically.

 


> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> # *Background*
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp".  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
>  
> *# Goal* 
> 1) Tasks can utilize all disks when using tmp
> 2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
> is done.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15447) To improve utilization of the `java.io.tmpdir` for YARN module

2020-01-23 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15447:

Summary: To improve utilization of the `java.io.tmpdir` for YARN module  
(was: Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" )

> To improve utilization of the `java.io.tmpdir` for YARN module
> --
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-18 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018792#comment-17018792
 ] 

Victor Wong commented on FLINK-15447:
-

[~fly_in_gis] the owner of the working directory of Yarn container should be 
the same as the owner of the JM/TM process, so I think there should be no 
permission issue, right?

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-18 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018790#comment-17018790
 ] 

Victor Wong edited comment on FLINK-15447 at 1/19/20 4:24 AM:
--

[~rongr] for me #2 is most close to the main concern, but we do not want to 
share with others for fine-grain control on disk resource, but for a shared 
location is very prone to be disk full.

There are some good points in HADOOP-2735:

_Can we add -Djava.io.tmpdir="./tmp" somewhere ?_
 _so that,_
 _1) Tasks can utilize all disks when using tmp_
 _2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
is done._


was (Author: victor-wong):
[~rongr] for me #2 is most close to the main concern, but we do not want to 
share with others not for fine-grain control on disk resource, but for a shared 
location is very prone to be disk full.

There are some good points in HADOOP-2735:

_Can we add -Djava.io.tmpdir="./tmp" somewhere ?_
 _so that,_
 _1) Tasks can utilize all disks when using tmp_
 _2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
is done._

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-18 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018790#comment-17018790
 ] 

Victor Wong edited comment on FLINK-15447 at 1/19/20 4:23 AM:
--

[~rongr] for me #2 is most close to the main concern, but we do not want to 
share with others not for fine-grain control on disk resource, but for a shared 
location is very prone to be disk full.

There are some good points in HADOOP-2735:

_Can we add -Djava.io.tmpdir="./tmp" somewhere ?_
 _so that,_
 _1) Tasks can utilize all disks when using tmp_
 _2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
is done._


was (Author: victor-wong):
[~rongr] for me #2 is most close to the main concern, but we do not want to 
share with others not for fine-grain control on disk resource, but for a shared 
location is very prone to be disk full.

There are some good points in 
[HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]:

_Can we add -Djava.io.tmpdir="./tmp" somewhere ?
so that,
1) Tasks can utilize all disks when using tmp
2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
is done._

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-18 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018790#comment-17018790
 ] 

Victor Wong commented on FLINK-15447:
-

[~rongr] for me #2 is most close to the main concern, but we do not want to 
share with others not for fine-grain control on disk resource, but for a shared 
location is very prone to be disk full.

There are some good points in 
[HADOOP-2735|https://issues.apache.org/jira/browse/HADOOP-2735]:

_Can we add -Djava.io.tmpdir="./tmp" somewhere ?
so that,
1) Tasks can utilize all disks when using tmp
2) Any undeleted tmp files will be deleted by the tasktracker when task(job?) 
is done._

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-17 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018462#comment-17018462
 ] 

Victor Wong edited comment on FLINK-15447 at 1/18/20 1:41 AM:
--

[~rongr] Thanks for your reply!

_Could you elaborate what does it mean by Flink-YARN default the value to /tmp? 
I am guessing you mean JVM default the value to /tmp ?_
---
Yes,  I mean that Flink-YARN is using the default value of JVM, which is "/tmp".


_So far the only place I can see in flink yarn code utilizing this key is .._
---
The third-party dependencies might utilize this key as well, as [~lzljs3620320] 
mentioned. 
My intention is similar to how [~xymaqingxiang] mentioned that setting the 
value to a tmp directory under the working directory of Yarn container.


was (Author: victor-wong):
[~rongr] Thanks for your reply!

_Could you elaborate what does it mean by Flink-YARN default the value to /tmp? 
I am guessing you mean JVM default the value to /tmp ?_
---
Yes,  I mean that Flink-YARN is using the default value of JVM, which is "/tmp".


_So far the only place I can see in flink yarn code utilizing this key is: 
https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899_
---
The third-party dependencies might utilize this key as well, as [~lzljs3620320] 
mentioned. 
My intention is similar to how [~xymaqingxiang] mentioned that setting the 
value to a tmp directory under the working directory of Yarn container.

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-17 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018462#comment-17018462
 ] 

Victor Wong edited comment on FLINK-15447 at 1/18/20 1:40 AM:
--

[~rongr] Thanks for your reply!

_Could you elaborate what does it mean by Flink-YARN default the value to /tmp? 
I am guessing you mean JVM default the value to /tmp ?_
---
Yes,  I mean that Flink-YARN is using the default value of JVM, which is "/tmp".

_So far the only place I can see in flink yarn code utilizing this key is: 
https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899_
---
The third-party dependencies might utilize this key as well, as [~lzljs3620320] 
mentioned. 
My intention is similar to how [~xymaqingxiang] mentioned that setting the 
value to a tmp directory under the working directory of Yarn container.


was (Author: victor-wong):
[~rongr] Thanks for your reply!

_Could you elaborate what does it mean by Flink-YARN default the value to /tmp? 
I am guessing you mean JVM default the value to /tmp ?
_
---
Yes,  I mean that Flink-YARN is using the default value of JVM, which is "/tmp".

_So far the only place I can see in flink yarn code utilizing this key is: 
https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899
_
---
The third-party dependencies might utilize this key as well, as [~lzljs3620320] 
mentioned. 
My intention is similar to how [~xymaqingxiang] mentioned that setting the 
value to a tmp directory under the working directory of Yarn container.

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-17 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018462#comment-17018462
 ] 

Victor Wong edited comment on FLINK-15447 at 1/18/20 1:40 AM:
--

[~rongr] Thanks for your reply!

_Could you elaborate what does it mean by Flink-YARN default the value to /tmp? 
I am guessing you mean JVM default the value to /tmp ?_
---
Yes,  I mean that Flink-YARN is using the default value of JVM, which is "/tmp".


_So far the only place I can see in flink yarn code utilizing this key is: 
https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899_
---
The third-party dependencies might utilize this key as well, as [~lzljs3620320] 
mentioned. 
My intention is similar to how [~xymaqingxiang] mentioned that setting the 
value to a tmp directory under the working directory of Yarn container.


was (Author: victor-wong):
[~rongr] Thanks for your reply!

_Could you elaborate what does it mean by Flink-YARN default the value to /tmp? 
I am guessing you mean JVM default the value to /tmp ?_
---
Yes,  I mean that Flink-YARN is using the default value of JVM, which is "/tmp".

_So far the only place I can see in flink yarn code utilizing this key is: 
https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899_
---
The third-party dependencies might utilize this key as well, as [~lzljs3620320] 
mentioned. 
My intention is similar to how [~xymaqingxiang] mentioned that setting the 
value to a tmp directory under the working directory of Yarn container.

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-17 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018462#comment-17018462
 ] 

Victor Wong commented on FLINK-15447:
-

[~rongr] Thanks for your reply!

_Could you elaborate what does it mean by Flink-YARN default the value to /tmp? 
I am guessing you mean JVM default the value to /tmp ?
_
---
Yes,  I mean that Flink-YARN is using the default value of JVM, which is "/tmp".

_So far the only place I can see in flink yarn code utilizing this key is: 
https://github.com/apache/flink/blob/release-1.10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L899
_
---
The third-party dependencies might utilize this key as well, as [~lzljs3620320] 
mentioned. 
My intention is similar to how [~xymaqingxiang] mentioned that setting the 
value to a tmp directory under the working directory of Yarn container.

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-08 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17011475#comment-17011475
 ] 

Victor Wong commented on FLINK-15448:
-

[~trohrmann] I haven't done this before, but if you guys are ok with this then 
I'd like to give it a try with a google doc edition first.

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-08 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010590#comment-17010590
 ] 

Victor Wong commented on FLINK-15448:
-

[~zhuzh] different ResourceID could have constructors with deployment-related 
arguments, e.g.

{code:java}
public YarnResourceID(Container container)
public KubernetesResourceID(KubernetesPod pod)
...
{code}

So they can have a custom `#toString` method with all the information needed.

A simple demo:

{code:java}
public class YarnResourceID extends ResourceID {
private final String resourceIdStr;

public YarnResourceID(Container container) {
super(container.getId().toString());

this.resourceIdStr = container.getId() + "@" + 
container.getNodeId();
}

@Override
public String toString() {
return resourceIdStr;
}
}
{code}

As [~trohrmann] said,  "_Flink uses a lot of ids in its logs which are hard to 
decipher for the user_",  so a custom "ResourceID#toString" method with the 
information needed would improve the situation. WDYT?


 

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-08 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010485#comment-17010485
 ] 

Victor Wong commented on FLINK-15448:
-

I agreed that using extended ResourceID would be a great help, but not by 
TaskManagerID/JobManagerID. Maybe we should distinguish different resources by 
their underlying deployment service, e.g. 
YarnResourceID/KubernetesResourceID/MesosResourceID.

```
class YarnResourceID extends ResourceID {
  public YarnResourceID(Container container) {
...
  }
}
```


> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-07 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15448:

Labels:   (was: pull-request-available)

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-07 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15448:

Labels:   (was: pull-request-available)

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-02 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007267#comment-17007267
 ] 

Victor Wong commented on FLINK-15447:
-

[~yunta], [~lzljs3620320], sorry to ping you guys, I'd like to know what should 
I do next? do I need to wait for more discussion to reach a consensus?

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15450) Add kafka topic information to Kafka source name on Flink UI

2020-01-02 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007265#comment-17007265
 ] 

Victor Wong commented on FLINK-15450:
-

Any suggestion on this:)

> Add kafka topic information to Kafka source name on Flink UI
> 
>
> Key: FLINK-15450
> URL: https://issues.apache.org/jira/browse/FLINK-15450
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Connectors / Kafka
>Reporter: Victor Wong
>Priority: Major
>
> If the user did not specify a custom name to the source, e.g. Kafka source, 
> Flink would use the default name "Custom Source", which was not intuitive 
> (Sink was the same).
> {code:java}
> Source: Custom Source -> Filter -> Map -> Sink: Unnamed
> {code}
> If we could add the Kafka topic information to the default Source/Sink name, 
> it would be very helpful to catch the consuming/publishing topic quickly, 
> like this:
> {code:java}
> Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
> {code}
> *Suggestion* (forgive me if it makes too many changes)
> 1. Add a `name` method to interface `Function`
> {code:java}
> public interface Function extends java.io.Serializable {
>   default String name() { return ""; }
> }
> {code}
> 2. Source/Sink/Other functions override this method depending on their needs.
> {code:java}
> class FlinkKafkaConsumerBase {
> String name() {
>   return this.topicsDescriptor.toString();
> }
> }
> {code}
> 3. Use Function#name if the returned value is not empty.
> {code:java}
> // StreamExecutionEnvironment
>   public  DataStreamSource addSource(SourceFunction 
> function) {
>   String sourceName = function.name();
>   if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
>   sourceName = "Custom Source";
>   }
>   return addSource(function, sourceName);
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-02 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007263#comment-17007263
 ] 

Victor Wong commented on FLINK-15448:
-

[~zhuzh], sorry for my late response, my personal initial intention is to 
improve logs of TM TimeoutException, since lost container could not be found on 
Flink UI, for example: 

{code:java}
// 1. 
org.apache.flink.runtime.jobmaster.JobMaster.TaskManagerHeartbeatListener#notifyHeartbeatTimeout
new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed 
out."));

// 2. 
org.apache.flink.runtime.resourcemanager.ResourceManager.TaskManagerHeartbeatListener#notifyHeartbeatTimeout
log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  
timed out."));
{code}

If it's not proper to "_to add the host to each log line that contains a 
ResourceID_", improving these exception logs would also be a great help. 

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Log host informations for TaskManager failures.

2020-01-02 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006726#comment-17006726
 ] 

Victor Wong commented on FLINK-15448:
-

[~zhuzh], with a long-running job it's actually quite not convenient to find 
the host information because the log file can be very large, and if you 
configured log file rotation, the host information of a container might be 
missing. In some circumstances, as mentioned by Xintong, "when you have lots of 
TM failures" it would be more annoying.

About the redundancy concern, since the host information is mainly logged in 
case of exception, I don't think it's a problem:)

> Log host informations for TaskManager failures.
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Minor
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Make "ResourceID#toString" more descriptive

2020-01-02 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006663#comment-17006663
 ] 

Victor Wong commented on FLINK-15448:
-

[~xintongsong], thanks for your patient explanation,  _"providing more 
information at the places where the logs are generated"_  seems to be the best 
choice. Do you mind me to come up with a patch on this?

> Make "ResourceID#toString" more descriptive
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15450) Add kafka topic information to Kafka source name on Flink UI

2020-01-01 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15450:

Component/s: API / Core

> Add kafka topic information to Kafka source name on Flink UI
> 
>
> Key: FLINK-15450
> URL: https://issues.apache.org/jira/browse/FLINK-15450
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Connectors / Kafka
>Reporter: Victor Wong
>Priority: Major
>
> If the user did not specify a custom name to the source, e.g. Kafka source, 
> Flink would use the default name "Custom Source", which was not intuitive 
> (Sink was the same).
> {code:java}
> Source: Custom Source -> Filter -> Map -> Sink: Unnamed
> {code}
> If we could add the Kafka topic information to the default Source/Sink name, 
> it would be very helpful to catch the consuming/publishing topic quickly, 
> like this:
> {code:java}
> Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
> {code}
> *Suggestion* (forgive me if it makes too many changes)
> 1. Add a `name` method to interface `Function`
> {code:java}
> public interface Function extends java.io.Serializable {
>   default String name() { return ""; }
> }
> {code}
> 2. Source/Sink/Other functions override this method depending on their needs.
> {code:java}
> class FlinkKafkaConsumerBase {
> String name() {
>   return this.topicsDescriptor.toString();
> }
> }
> {code}
> 3. Use Function#name if the returned value is not empty.
> {code:java}
> // StreamExecutionEnvironment
>   public  DataStreamSource addSource(SourceFunction 
> function) {
>   String sourceName = function.name();
>   if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
>   sourceName = "Custom Source";
>   }
>   return addSource(function, sourceName);
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15448) Make "ResourceID#toString" more descriptive

2020-01-01 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006609#comment-17006609
 ] 

Victor Wong commented on FLINK-15448:
-

[~xintongsong], thanks for your reply, I think your concern is very reasonable, 
but I still have some questions.

_the right approach should be providing more information at the places where 
the logs are generated_
---
The information we need, like the host of TM, is not always available or 
convenient to access in some place. Take `HeartbeatListener` for example: 

{code:java}
// 
org.apache.flink.runtime.jobmaster.JobMaster.TaskManagerHeartbeatListener#notifyHeartbeatTimeout
public void notifyHeartbeatTimeout(ResourceID resourceID) {
validateRunsInMainThread();
  // *I think it's not easy to construct a correct log 
information here*.
disconnectTaskManager(
resourceID,
new TimeoutException("Heartbeat of TaskManager 
with id " + resourceID + " timed out."));
}
{code}

Besides, I think it's error-prone to keep in mind providing the exact needed 
information when logging.

What about initialize Yarn ResourceID with both container and host information, 
i.e. `new ResourceID(container.getId().toString() + "@" + 
container.getNodeId())`. Any suggestion?

> Make "ResourceID#toString" more descriptive
> ---
>
> Key: FLINK-15448
> URL: https://issues.apache.org/jira/browse/FLINK-15448
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> With Flink on Yarn, sometimes we ran into an exception like this:
> {code:java}
> java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
> container_  timed out.
> {code}
> We'd like to find out the host of the lost TaskManager to log into it for 
> more details, we have to check the previous logs for the host information, 
> which is a little time-consuming.
> Maybe we can add more descriptive information to ResourceID of Yarn 
> containers, e.g. "container_xxx@host_name:port_number".
> Here's the demo:
> {code:java}
> class ResourceID {
>   final String resourceId;
>   final String details;
>   public ResourceID(String resourceId) {
> this.resourceId = resourceId;
> this.details = resourceId;
>   }
>   public ResourceID(String resourceId, String details) {
> this.resourceId = resourceId;
> this.details = details;
>   }
>   public String toString() {
> return details;
>   } 
> }
> // in flink-yarn
> private void startTaskExecutorInContainer(Container container) {
>   final String containerIdStr = container.getId().toString();
>   final String containerDetail = container.getId() + "@" + 
> container.getNodeId();  
>   final ResourceID resourceId = new ResourceID(containerIdStr, 
> containerDetail);
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15449) Retain lost task managers on Flink UI

2020-01-01 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006573#comment-17006573
 ] 

Victor Wong commented on FLINK-15449:
-

Nice suggestion! We could add the constructed log URL (the Yarn container log 
URL) to Flink UI.

> Retain lost task managers on Flink UI
> -
>
> Key: FLINK-15449
> URL: https://issues.apache.org/jira/browse/FLINK-15449
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> With Flink on Yarn, sometimes our TaskManager was killed because of OOM or 
> heartbeat timeout or whatever reasons, it's not convenient to check out the 
> logs of the lost TaskManger.
> Can we retain the lost task managers on Flink UI, and provide the log service 
> through Yarn (we can redirect the URL of log/stdout to Yarn container 
> log/stdout)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2020-01-01 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006558#comment-17006558
 ] 

Victor Wong commented on FLINK-15447:
-

[~yunta] [~klion26] Thanks for your reply :)

_What caused the "/tmp" directory full?_
---
It may not be caused by Flink applications since other applications like 
MapReduce/Spark would run on the same Yarn cluster too. 
I think it's not safe to use "/tmp" directory since it's shared by other tasks. 
 Tasks, like the Flink TaskManager, could make use of its own working directory 
assigned by Yarn.

_Flink already set io.tmp.dirs as 'LOCAL_DIRS' in YARN_
---
This config can not solve the problem completely, since the third-party 
libraries used by users may still use "java.io.tmpdir" property.

There is similar discussion here: 
https://issues.apache.org/jira/browse/HADOOP-2735

> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15450) Add kafka topic information to Kafka source name on Flink UI

2019-12-31 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15450:

Description: 
If the user did not specify a custom name to the source, e.g. Kafka source, 
Flink would use the default name "Custom Source", which was not intuitive (Sink 
was the same).


{code:java}
Source: Custom Source -> Filter -> Map -> Sink: Unnamed
{code}

If we could add the Kafka topic information to the default Source/Sink name, it 
would be very helpful to catch the consuming/publishing topic quickly, like 
this:

{code:java}
Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
{code}

*Suggestion* (forgive me if it makes too many changes)

1. Add a `name` method to interface `Function`

{code:java}
public interface Function extends java.io.Serializable {
default String name() { return ""; }
}
{code}

2. Source/Sink/Other functions override this method depending on their needs.

{code:java}
class FlinkKafkaConsumerBase {

String name() {
  return this.topicsDescriptor.toString();
}

}
{code}

3. Use Function#name if the returned value is not empty.

{code:java}
// StreamExecutionEnvironment
public  DataStreamSource addSource(SourceFunction 
function) {
String sourceName = function.name();
if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
sourceName = "Custom Source";
}
return addSource(function, sourceName);
}
{code}


  was:
If the user did not specify a custom name to the source, e.g. Kafka source, 
Flink would use the default name "Custom Source", which was not intuitive (Sink 
was the same).


{code:java}
Source: Custom Source -> Filter -> Map -> Sink: Unnamed
{code}

If we could add the Kafka topic information to the default Source/Sink name, it 
would be very helpful to catch the consuming/publishing topic quickly, like 
this:

{code:java}
Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
{code}

*Suggesion* (forgive me if it makes too much changes)

1. Add a `name` method to interface `Function`

{code:java}
public interface Function extends java.io.Serializable {
default String name() { return ""; }
}
{code}

2. Source/Sink/Other functions override this method depending on their needs.

{code:java}
class FlinkKafkaConsumerBase {

String name() {
  return this.topicsDescriptor.toString();
}

}
{code}

3. Use Function#name if the returned value is not empty.

{code:java}
// StreamExecutionEnvironment
public  DataStreamSource addSource(SourceFunction 
function) {
String sourceName = function.name();
if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
sourceName = "Custom Source";
}
return addSource(function, sourceName);
}
{code}



> Add kafka topic information to Kafka source name on Flink UI
> 
>
> Key: FLINK-15450
> URL: https://issues.apache.org/jira/browse/FLINK-15450
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Victor Wong
>Priority: Major
>
> If the user did not specify a custom name to the source, e.g. Kafka source, 
> Flink would use the default name "Custom Source", which was not intuitive 
> (Sink was the same).
> {code:java}
> Source: Custom Source -> Filter -> Map -> Sink: Unnamed
> {code}
> If we could add the Kafka topic information to the default Source/Sink name, 
> it would be very helpful to catch the consuming/publishing topic quickly, 
> like this:
> {code:java}
> Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
> {code}
> *Suggestion* (forgive me if it makes too many changes)
> 1. Add a `name` method to interface `Function`
> {code:java}
> public interface Function extends java.io.Serializable {
>   default String name() { return ""; }
> }
> {code}
> 2. Source/Sink/Other functions override this method depending on their needs.
> {code:java}
> class FlinkKafkaConsumerBase {
> String name() {
>   return this.topicsDescriptor.toString();
> }
> }
> {code}
> 3. Use Function#name if the returned value is not empty.
> {code:java}
> // StreamExecutionEnvironment
>   public  DataStreamSource addSource(SourceFunction 
> function) {
>   String sourceName = function.name();
>   if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
>   sourceName = "Custom Source";
>   }
>   return addSource(function, sourceName);
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15450) Add kafka topic information to Kafka source name on Flink UI

2019-12-31 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15450:

Summary: Add kafka topic information to Kafka source name on Flink UI  
(was: Add kafka topic information to Kafka source)

> Add kafka topic information to Kafka source name on Flink UI
> 
>
> Key: FLINK-15450
> URL: https://issues.apache.org/jira/browse/FLINK-15450
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Victor Wong
>Priority: Major
>
> If the user did not specify a custom name to the source, e.g. Kafka source, 
> Flink would use the default name "Custom Source", which was not intuitive 
> (Sink was the same).
> {code:java}
> Source: Custom Source -> Filter -> Map -> Sink: Unnamed
> {code}
> If we could add the Kafka topic information to the default Source/Sink name, 
> it would be very helpful to catch the consuming/publishing topic quickly, 
> like this:
> {code:java}
> Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
> {code}
> *Suggesion* (forgive me if it makes too much changes)
> 1. Add a `name` method to interface `Function`
> {code:java}
> public interface Function extends java.io.Serializable {
>   default String name() { return ""; }
> }
> {code}
> 2. Source/Sink/Other functions override this method depending on their needs.
> {code:java}
> class FlinkKafkaConsumerBase {
> String name() {
>   return this.topicsDescriptor.toString();
> }
> }
> {code}
> 3. Use Function#name if the returned value is not empty.
> {code:java}
> // StreamExecutionEnvironment
>   public  DataStreamSource addSource(SourceFunction 
> function) {
>   String sourceName = function.name();
>   if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
>   sourceName = "Custom Source";
>   }
>   return addSource(function, sourceName);
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15450) Add kafka topic information to Kafka source

2019-12-31 Thread Victor Wong (Jira)
Victor Wong created FLINK-15450:
---

 Summary: Add kafka topic information to Kafka source
 Key: FLINK-15450
 URL: https://issues.apache.org/jira/browse/FLINK-15450
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Victor Wong


If the user did not specify a custom name to the source, e.g. Kafka source, 
Flink would use the default name "Custom Source", which was not intuitive (Sink 
was the same).


{code:java}
Source: Custom Source -> Filter -> Map -> Sink: Unnamed
{code}

If we could add the Kafka topic information to the default Source/Sink name, it 
would be very helpful to catch the consuming/publishing topic quickly, like 
this:

{code:java}
Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
{code}

*Suggesion* (forgive me if it makes too much changes)

1. Add a `name` method to interface `Function`

{code:java}
public interface Function extends java.io.Serializable {
default String name() { return ""; }
}
{code}

2. Source/Sink/Other functions override this method depending on their needs.

{code:java}
class FlinkKafkaConsumerBase {

String name() {
  return this.topicsDescriptor.toString();
}

}
{code}

3. Use Function#name if the returned value is not empty.

{code:java}
// StreamExecutionEnvironment
public  DataStreamSource addSource(SourceFunction 
function) {
String sourceName = function.name();
if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
sourceName = "Custom Source";
}
return addSource(function, sourceName);
}
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15449) Retain lost task managers on Flink UI

2019-12-31 Thread Victor Wong (Jira)
Victor Wong created FLINK-15449:
---

 Summary: Retain lost task managers on Flink UI
 Key: FLINK-15449
 URL: https://issues.apache.org/jira/browse/FLINK-15449
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.9.1
Reporter: Victor Wong


With Flink on Yarn, sometimes our TaskManager was killed because of OOM or 
heartbeat timeout or whatever reasons, it's not convenient to check out the 
logs of the lost TaskManger.

Can we retain the lost task managers on Flink UI, and provide the log service 
through Yarn (we can redirect the URL of log/stdout to Yarn container 
log/stdout)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15448) Make "ResourceID#toString" more descriptive

2019-12-31 Thread Victor Wong (Jira)
Victor Wong created FLINK-15448:
---

 Summary: Make "ResourceID#toString" more descriptive
 Key: FLINK-15448
 URL: https://issues.apache.org/jira/browse/FLINK-15448
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.9.1
Reporter: Victor Wong


With Flink on Yarn, sometimes we ran into an exception like this:

{code:java}
java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id 
container_  timed out.
{code}

We'd like to find out the host of the lost TaskManager to log into it for more 
details, we have to check the previous logs for the host information, which is 
a little time-consuming.

Maybe we can add more descriptive information to ResourceID of Yarn containers, 
e.g. "container_xxx@host_name:port_number".

Here's the demo:


{code:java}
class ResourceID {
  final String resourceId;
  final String details;

  public ResourceID(String resourceId) {
this.resourceId = resourceId;
this.details = resourceId;
  }

  public ResourceID(String resourceId, String details) {
this.resourceId = resourceId;
this.details = details;
  }

  public String toString() {
return details;
  }   
}

// in flink-yarn
private void startTaskExecutorInContainer(Container container) {
  final String containerIdStr = container.getId().toString();
  final String containerDetail = container.getId() + "@" + 
container.getNodeId();  
  final ResourceID resourceId = new ResourceID(containerIdStr, containerDetail);
  ...
}
{code}







--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2019-12-31 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-15447:

Description: 
Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to 
the default value, which is "/tmp". 

 

Sometimes we ran into exceptions caused by a full "/tmp" directory, which would 
not be cleaned automatically after applications finished.

I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
something similar. "PWD" will be replaced with the true working 
directory of JM/TM by Yarn, which will be cleaned automatically.

 

  was:
Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to 
the default value, which is "/tmp". 

 

Sometimes we ran into exceptions caused by a full "/tmp" directory, which would 
not be cleaned automatically after applications finished.

I think we can set "java.io.tmpdir" to "\{{PWD}}/tmp" directory, or something 
similar. "\{{PWD}}" will be replaced with the true working directory of JM/TM 
by Yarn, which will be cleaned automatically.

 


> Change "java.io.tmpdir"  of JM/TM on Yarn to "{{PWD}}/tmp" 
> ---
>
> Key: FLINK-15447
> URL: https://issues.apache.org/jira/browse/FLINK-15447
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set 
> to the default value, which is "/tmp". 
>  
> Sometimes we ran into exceptions caused by a full "/tmp" directory, which 
> would not be cleaned automatically after applications finished.
> I think we can set "java.io.tmpdir" to "PWD/tmp" directory, or 
> something similar. "PWD" will be replaced with the true working 
> directory of JM/TM by Yarn, which will be cleaned automatically.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15447) Change "java.io.tmpdir" of JM/TM on Yarn to "{{PWD}}/tmp"

2019-12-31 Thread Victor Wong (Jira)
Victor Wong created FLINK-15447:
---

 Summary: Change "java.io.tmpdir"  of JM/TM on Yarn to 
"{{PWD}}/tmp" 
 Key: FLINK-15447
 URL: https://issues.apache.org/jira/browse/FLINK-15447
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.9.1
Reporter: Victor Wong


Currently, when running Flink on Yarn, the "java.io.tmpdir" property is set to 
the default value, which is "/tmp". 

 

Sometimes we ran into exceptions caused by a full "/tmp" directory, which would 
not be cleaned automatically after applications finished.

I think we can set "java.io.tmpdir" to "\{{PWD}}/tmp" directory, or something 
similar. "\{{PWD}}" will be replaced with the true working directory of JM/TM 
by Yarn, which will be cleaned automatically.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining

2019-11-29 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong closed FLINK-14835.
---
Resolution: Abandoned

> Make `org.apache.flink.configuration.Configuration` support method chaining
> ---
>
> Key: FLINK-14835
> URL: https://issues.apache.org/jira/browse/FLINK-14835
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> *Goal:*
> To make the following code examples work in production, which is very handy 
> for users to set a couple of configurations: 
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
>  
> *Suggestion:*
> Currently, the return type of `setXXX` method is "void", we can make it 
> return `Configuration` itself to support method chaining.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining

2019-11-29 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16984971#comment-16984971
 ] 

Victor Wong commented on FLINK-14835:
-

That's great:)

> Make `org.apache.flink.configuration.Configuration` support method chaining
> ---
>
> Key: FLINK-14835
> URL: https://issues.apache.org/jira/browse/FLINK-14835
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> *Goal:*
> To make the following code examples work in production, which is very handy 
> for users to set a couple of configurations: 
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
>  
> *Suggestion:*
> Currently, the return type of `setXXX` method is "void", we can make it 
> return `Configuration` itself to support method chaining.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure

2019-11-26 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983089#comment-16983089
 ] 

Victor Wong commented on FLINK-14653:
-

_"introduce a new exception which users can throw to indicate that the error is 
application related"_

Surely this can solve the problem, but I think it's a little error-prone for 
the users since they need to remember to catch some exceptions, e.g. exceptions 
when flushing events to an external store, and rethrow as the new exception.

> Job-related errors in snapshotState do not result in job failure
> 
>
> Key: FLINK-14653
> URL: https://issues.apache.org/jira/browse/FLINK-14653
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Maximilian Michels
>Priority: Minor
>
> When users override {{snapshoteState}}, they might include logic there which 
> is crucial for the correctness of their application, e.g. finalizing a 
> transaction and buffering the results of that transaction, or flushing events 
> to an external store. Exceptions occurring should lead to failing the job.
> Currently, users must make sure to throw a {{Throwable}} because any 
> {{Exception}} will be caught by the task and reported as checkpointing error, 
> when it could be an application error.
> It would be helpful to update the documentation and introduce a special 
> exception that can be thrown for job-related failures, e.g. 
> {{ApplicationError}} or similar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure

2019-11-26 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982434#comment-16982434
 ] 

Victor Wong commented on FLINK-14653:
-

[~mxm], any progress on this?

I have some solutions, do you mind taking a look:

 

*Solution 1:*

catch the exception of `CheckpointedFunction#snapshotState` and rethrow as 
*Error* like the patch of Beam did. ** 

 

*Solution 2:*

catch the exception of `CheckpointedFunction#snapshotState` and rethrow as a 
new exception type, e.g. *SnapshotStateException*, and catch 
SnapshotStateException later to not mark CheckpointFailureReason as 
CHECKPOINT_DECLINED, so it would not be ignored even if the user has set his 
job to tolerate checkpointing failures.

> Job-related errors in snapshotState do not result in job failure
> 
>
> Key: FLINK-14653
> URL: https://issues.apache.org/jira/browse/FLINK-14653
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Maximilian Michels
>Priority: Minor
>
> When users override {{snapshoteState}}, they might include logic there which 
> is crucial for the correctness of their application, e.g. finalizing a 
> transaction and buffering the results of that transaction, or flushing events 
> to an external store. Exceptions occurring should lead to failing the job.
> Currently, users must make sure to throw a {{Throwable}} because any 
> {{Exception}} will be caught by the task and reported as checkpointing error, 
> when it could be an application error.
> It would be helpful to update the documentation and introduce a special 
> exception that can be thrown for job-related failures, e.g. 
> {{ApplicationError}} or similar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining

2019-11-26 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982317#comment-16982317
 ] 

Victor Wong commented on FLINK-14835:
-

[~jark], after changing the return type of `Configuration`, `mvn clean verify` 
failed because of japicmp error:

 
{code:java}
[ERROR] Failed to execute goal 
com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp (default) on project 
flink-core: Breaking the build because there is at least one incompatibility: 
org.apache.flink.configuration.Configuration.setBoolean(java.lang.String,boolean):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setBytes(java.lang.String,byte[]):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setClass(java.lang.String,java.lang.Class):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setDouble(java.lang.String,double):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setFloat(java.lang.String,float):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setInteger(java.lang.String,int):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setLong(java.lang.String,long):METHOD_RETURN_TYPE_CHANGED,org.apache.flink.configuration.Configuration.setString(java.lang.String,java.lang.String):METHOD_RETURN_TYPE_CHANGED
 -> [Help 1]
{code}
It seems we have to give up this feature, right?

> Make `org.apache.flink.configuration.Configuration` support method chaining
> ---
>
> Key: FLINK-14835
> URL: https://issues.apache.org/jira/browse/FLINK-14835
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> *Goal:*
> To make the following code examples work in production, which is very handy 
> for users to set a couple of configurations: 
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
>  
> *Suggestion:*
> Currently, the return type of `setXXX` method is "void", we can make it 
> return `Configuration` itself to support method chaining.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples

2019-11-26 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982285#comment-16982285
 ] 

Victor Wong commented on FLINK-14817:
-

[~jark], the PR is ready, could you take a look when convenient.

> "Streaming Aggregation" document contains misleading code examples
> --
>
> Key: FLINK-14817
> URL: https://issues.apache.org/jira/browse/FLINK-14817
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the document of [Streaming Aggregation 
> |https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]
>  , there are some misleading code examples, e.g.
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
> It seems `Configuration` supports method chaining, while it's not true since 
> the return type of `Configuration#setString` is Void.
>  
> So what about making `Configuration` support method chaining, or updating the 
> documentation?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples

2019-11-17 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16976257#comment-16976257
 ] 

Victor Wong commented on FLINK-14817:
-

[~jark], thanks for your quick reply. Please assign this issue to me, and this 
is the issue discussing "method chain for Configuration": [ FLINK-14835 | 
https://issues.apache.org/jira/browse/FLINK-14835]

> "Streaming Aggregation" document contains misleading code examples
> --
>
> Key: FLINK-14817
> URL: https://issues.apache.org/jira/browse/FLINK-14817
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> In the document of [Streaming Aggregation 
> |https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]
>  , there are some misleading code examples, e.g.
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
> It seems `Configuration` supports method chaining, while it's not true since 
> the return type of `Configuration#setString` is Void.
>  
> So what about making `Configuration` support method chaining, or updating the 
> documentation?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining

2019-11-17 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-14835:

Description: 
*Goal:*

To make the following code examples work in production, which is very handy for 
users to set a couple of configurations: 
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}
 

*Suggestion:*

Currently, the return type of `setXXX` method is "void", we can make it return 
`Configuration` itself to support method chaining.

  was:
To make the following code examples work in production, which is very handy for 
users to set a couple of configurations.

 
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}


> Make `org.apache.flink.configuration.Configuration` support method chaining
> ---
>
> Key: FLINK-14835
> URL: https://issues.apache.org/jira/browse/FLINK-14835
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> *Goal:*
> To make the following code examples work in production, which is very handy 
> for users to set a couple of configurations: 
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
>  
> *Suggestion:*
> Currently, the return type of `setXXX` method is "void", we can make it 
> return `Configuration` itself to support method chaining.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining

2019-11-17 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-14835:

Description: 
To make the following code examples work in production, which is very handy for 
users to set a couple of configurations.

 
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}

  was:
To make the following code examples work in production, which is very handy for 
users to set a couple of configurations.

 
// instantiate table environmentTableEnvironment tEnv = ...tEnv.getConfig() 
   // access high-level configuration  .getConfiguration()   // set low-level 
key-value options  .setString("table.exec.mini-batch.enabled", "true")  // 
enable mini-batch optimization  
.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records  .setString("table.exec.mini-batch.size", "5000"); // the 
maximum number of records can be buffered by each aggregate operator task


> Make `org.apache.flink.configuration.Configuration` support method chaining
> ---
>
> Key: FLINK-14835
> URL: https://issues.apache.org/jira/browse/FLINK-14835
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> To make the following code examples work in production, which is very handy 
> for users to set a couple of configurations.
>  
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining

2019-11-17 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-14835:

External issue URL:   (was: 
https://issues.apache.org/jira/browse/FLINK-14817)

> Make `org.apache.flink.configuration.Configuration` support method chaining
> ---
>
> Key: FLINK-14835
> URL: https://issues.apache.org/jira/browse/FLINK-14835
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> To make the following code examples work in production, which is very handy 
> for users to set a couple of configurations.
>  
> // instantiate table environmentTableEnvironment tEnv = ...tEnv.getConfig()   
>  // access high-level configuration  .getConfiguration()   // set 
> low-level key-value options  .setString("table.exec.mini-batch.enabled", 
> "true")  // enable mini-batch optimization  
> .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
> buffer input records  .setString("table.exec.mini-batch.size", "5000"); // 
> the maximum number of records can be buffered by each aggregate operator task



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14835) Make `org.apache.flink.configuration.Configuration` support method chaining

2019-11-17 Thread Victor Wong (Jira)
Victor Wong created FLINK-14835:
---

 Summary: Make `org.apache.flink.configuration.Configuration` 
support method chaining
 Key: FLINK-14835
 URL: https://issues.apache.org/jira/browse/FLINK-14835
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Affects Versions: 1.9.1
Reporter: Victor Wong


To make the following code examples work in production, which is very handy for 
users to set a couple of configurations.

 
// instantiate table environmentTableEnvironment tEnv = ...tEnv.getConfig() 
   // access high-level configuration  .getConfiguration()   // set low-level 
key-value options  .setString("table.exec.mini-batch.enabled", "true")  // 
enable mini-batch optimization  
.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records  .setString("table.exec.mini-batch.size", "5000"); // the 
maximum number of records can be buffered by each aggregate operator task



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples

2019-11-15 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-14817:

Description: 
In the document of [Streaming Aggregation 
|https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]
 , there are some misleading code examples, e.g.
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}
It seems `Configuration` supports method chaining, while it's not true since 
the return type of `Configuration#setString` is Void.

 

So what about making `Configuration` support method chaining, or updating the 
documentation?

 

  was:
In the document of [Streaming Aggregation | 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]
 there are some misleading code examples, e.g.
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}
It seems `Configuration` supports method chaining, while it's not true since 
the return type of `Configuration#setString` is Void.

 

So what about making `Configuration` support method chaining, or updating the 
documentation?

 


> "Streaming Aggregation" document contains misleading code examples
> --
>
> Key: FLINK-14817
> URL: https://issues.apache.org/jira/browse/FLINK-14817
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> In the document of [Streaming Aggregation 
> |https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]
>  , there are some misleading code examples, e.g.
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
> It seems `Configuration` supports method chaining, while it's not true since 
> the return type of `Configuration#setString` is Void.
>  
> So what about making `Configuration` support method chaining, or updating the 
> documentation?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples

2019-11-15 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-14817:

Description: 
In the document of [Streaming 
Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]]
 there are some misleading code examples, e.g.
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}
It seems `Configuration` supports method chaining, while it's not true since 
the return type of `Configuration#setString` is Void.

 

So what about making `Configuration` support method chaining, or updating the 
documentation?

 

  was:
In the document of [Streaming 
Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html],]
 there are some misleading code examples, e.g.
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}
It seems `Configuration` supports method chaining, while it's not true since 
the return type of `Configuration#setString` is Void.

 

So what about making `Configuration` support method chaining, or updating the 
documentation?

 


> "Streaming Aggregation" document contains misleading code examples
> --
>
> Key: FLINK-14817
> URL: https://issues.apache.org/jira/browse/FLINK-14817
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> In the document of [Streaming 
> Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]]
>  there are some misleading code examples, e.g.
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
> It seems `Configuration` supports method chaining, while it's not true since 
> the return type of `Configuration#setString` is Void.
>  
> So what about making `Configuration` support method chaining, or updating the 
> documentation?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples

2019-11-15 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-14817:

Description: 
In the document of [Streaming Aggregation | 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]
 there are some misleading code examples, e.g.
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}
It seems `Configuration` supports method chaining, while it's not true since 
the return type of `Configuration#setString` is Void.

 

So what about making `Configuration` support method chaining, or updating the 
documentation?

 

  was:
In the document of [Streaming 
Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]]
 there are some misleading code examples, e.g.
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}
It seems `Configuration` supports method chaining, while it's not true since 
the return type of `Configuration#setString` is Void.

 

So what about making `Configuration` support method chaining, or updating the 
documentation?

 


> "Streaming Aggregation" document contains misleading code examples
> --
>
> Key: FLINK-14817
> URL: https://issues.apache.org/jira/browse/FLINK-14817
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> In the document of [Streaming Aggregation | 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html]
>  there are some misleading code examples, e.g.
> {code:java}
> // instantiate table environment
> TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
> configuration
>   .getConfiguration()   // set low-level key-value options
>   .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
> optimization
>   .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds 
> to buffer input records
>   .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
> records can be buffered by each aggregate operator task
> {code}
> It seems `Configuration` supports method chaining, while it's not true since 
> the return type of `Configuration#setString` is Void.
>  
> So what about making `Configuration` support method chaining, or updating the 
> documentation?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14817) "Streaming Aggregation" document contains misleading code examples

2019-11-15 Thread Victor Wong (Jira)
Victor Wong created FLINK-14817:
---

 Summary: "Streaming Aggregation" document contains misleading code 
examples
 Key: FLINK-14817
 URL: https://issues.apache.org/jira/browse/FLINK-14817
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.9.1
Reporter: Victor Wong


In the document of [Streaming 
Aggregation|[https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html],]
 there are some misleading code examples, e.g.
{code:java}
// instantiate table environment
TableEnvironment tEnv = ...tEnv.getConfig()// access high-level 
configuration
  .getConfiguration()   // set low-level key-value options
  .setString("table.exec.mini-batch.enabled", "true")  // enable mini-batch 
optimization
  .setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to 
buffer input records
  .setString("table.exec.mini-batch.size", "5000"); // the maximum number of 
records can be buffered by each aggregate operator task
{code}
It seems `Configuration` supports method chaining, while it's not true since 
the return type of `Configuration#setString` is Void.

 

So what about making `Configuration` support method chaining, or updating the 
documentation?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure

2019-11-08 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16970104#comment-16970104
 ] 

Victor Wong commented on FLINK-14653:
-

Thanks for your explanation, now I think it makes sense.

> Job-related errors in snapshotState do not result in job failure
> 
>
> Key: FLINK-14653
> URL: https://issues.apache.org/jira/browse/FLINK-14653
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Maximilian Michels
>Priority: Minor
>
> When users override {{snapshoteState}}, they might include logic there which 
> is crucial for the correctness of their application, e.g. finalizing a 
> transaction and buffering the results of that transaction, or flushing events 
> to an external store. Exceptions occurring should lead to failing the job.
> Currently, users must make sure to throw a {{Throwable}} because any 
> {{Exception}} will be caught by the task and reported as checkpointing error, 
> when it could be an application error.
> It would be helpful to update the documentation and introduce a special 
> exception that can be thrown for job-related failures, e.g. 
> {{ApplicationError}} or similar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure

2019-11-08 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969976#comment-16969976
 ] 

Victor Wong commented on FLINK-14653:
-

_"I'm ok with tolerating checkpointing failures, but not ok with sacrificing 
the correctness of my Flink job."_

Sorry I don't get that, AFAIK in case of checkpointing failures, the whole job 
will restart and restore from the latest checkpointed state, so what do you 
mean by "sacrificing the correctness", data loss or data duplicate?

 

> Job-related errors in snapshotState do not result in job failure
> 
>
> Key: FLINK-14653
> URL: https://issues.apache.org/jira/browse/FLINK-14653
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Maximilian Michels
>Priority: Minor
>
> When users override {{snapshoteState}}, they might include logic there which 
> is crucial for the correctness of their application, e.g. finalizing a 
> transaction and buffering the results of that transaction, or flushing events 
> to an external store. Exceptions occurring should lead to failing the job.
> Currently, users must make sure to throw a {{Throwable}} because any 
> {{Exception}} will be caught by the task and reported as checkpointing error, 
> when it could be an application error.
> It would be helpful to update the documentation and introduce a special 
> exception that can be thrown for job-related failures, e.g. 
> {{ApplicationError}} or similar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values

2019-11-08 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969921#comment-16969921
 ] 

Victor Wong commented on FLINK-14642:
-

[~sewen], here is the PR, [https://github.com/apache/flink/pull/10130]

I updated the TupleSerializer and CaseClassSerializer, but I'm not sure if we 
should make the same change to **OptionSerializer** and **RowSerializer**,  any 
suggestion?

> Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
> 
>
> Key: FLINK-14642
> URL: https://issues.apache.org/jira/browse/FLINK-14642
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, TupleSerializer and CaseCassSerializer do not support serialize 
> NULL values, which I think is acceptable. But not supporting copy NULL values 
> will cause the following codes to throw an exception, which I think is not 
> matched with users' expectations and prone to error.
> *codes:*
> {code:java}
> stream.map(xxx).filter(_ != null).xxx //the return type of the map function 
> is Tuple and it may return null{code}
>  
> *exception info:*
>  
> {code:java}
> Caused by: java.lang.NullPointerException 
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
>  
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> {code}
>  
> *suggestion:*
> Can we make the `copy` method of TupleSerializer/CaseClassSerializer to 
> handle NULL values? e.g.
> {code:java}
> // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
> def copy(from: T): T = {
>   // handle NULL values.
>   if(from == null) {
> return from
>   }
>   initArray()
>   var i = 0
>   while (i < arity) {
> fields(i) = 
> fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
> i += 1
>   }
>   createInstance(fields)
> }
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14653) Job-related errors in snapshotState do not result in job failure

2019-11-07 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969910#comment-16969910
 ] 

Victor Wong commented on FLINK-14653:
-

[~mxm], if `CheckpointConfig#setTolerableCheckpointFailureNumber` is set to 0 
(which is the default value), a checkpoint failure will result in the job 
failure.

 

> Job-related errors in snapshotState do not result in job failure
> 
>
> Key: FLINK-14653
> URL: https://issues.apache.org/jira/browse/FLINK-14653
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Maximilian Michels
>Priority: Minor
>
> When users override {{snapshoteState}}, they might include logic there which 
> is crucial for the correctness of their application, e.g. finalizing a 
> transaction and buffering the results of that transaction, or flushing events 
> to an external store. Exceptions occurring should lead to failing the job.
> Currently, users must make sure to throw a {{Throwable}} because any 
> {{Exception}} will be caught by the task and reported as checkpointing error, 
> when it could be an application error.
> It would be helpful to update the documentation and introduce a special 
> exception that can be thrown for job-related failures, e.g. 
> {{ApplicationError}} or similar.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn

2019-11-07 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong closed FLINK-14626.
---
Resolution: Invalid

> User jar packaged with hadoop dependencies may cause class conflit with 
> hadoop jars on yarn
> ---
>
> Key: FLINK-14626
> URL: https://issues.apache.org/jira/browse/FLINK-14626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, the yarn application classpath is placed behind Flink classpath 
> (including user jars), which will cause conflict if the user jar accidentally 
> included hadoop dependencies.
> {code:java}
> // org.apache.flink.yarn.Utils#setupYarnClassPath
> public static void setupYarnClassPath(Configuration conf, Map 
> appMasterEnv) {
>addToEnvironment(
>   appMasterEnv,
>   Environment.CLASSPATH.name(),
>   appMasterEnv.get(ENV_FLINK_CLASSPATH));
>String[] applicationClassPathEntries = conf.getStrings(
>   YarnConfiguration.YARN_APPLICATION_CLASSPATH,
>   YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
>for (String c : applicationClassPathEntries) {
>   addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
>}
> }
> {code}
> Maybe we should place the user jars behind yarn application classpath when 
> `org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is 
> set to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar".
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn

2019-11-07 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969388#comment-16969388
 ] 

Victor Wong edited comment on FLINK-14626 at 11/7/19 4:20 PM:
--

[~chesnay], thanks for your quick response. As an engineer providing 
company-wide stream processing capabilities for other teams, I have seen quite 
a few errors related to this issue. It's really prone to including Hadoop 
dependencies in the uber jar, especially when your job deals with Hadoop 
systems, e.g. HBase.

But I agree that it _would change existing behavior and could result in subtle 
bugs,_ so I will close this issue and find another way to solve the related 
problems.


was (Author: victor-wong):
[~chesnay], thanks for your quick response. As an engineer providing 
company-wide stream processing capabilities for other teams, I have seen quite 
a few errors related to this issue. It's really prone to including Hadoop 
dependencies in the uber jar, especially when your job deals with Hadoop 
systems, e.g. HBase.

But I agree that it _would change existing behavior and could result in subtle 
bugs,_ so I will close this PR and find another way to solve the related 
problems.

> User jar packaged with hadoop dependencies may cause class conflit with 
> hadoop jars on yarn
> ---
>
> Key: FLINK-14626
> URL: https://issues.apache.org/jira/browse/FLINK-14626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, the yarn application classpath is placed behind Flink classpath 
> (including user jars), which will cause conflict if the user jar accidentally 
> included hadoop dependencies.
> {code:java}
> // org.apache.flink.yarn.Utils#setupYarnClassPath
> public static void setupYarnClassPath(Configuration conf, Map 
> appMasterEnv) {
>addToEnvironment(
>   appMasterEnv,
>   Environment.CLASSPATH.name(),
>   appMasterEnv.get(ENV_FLINK_CLASSPATH));
>String[] applicationClassPathEntries = conf.getStrings(
>   YarnConfiguration.YARN_APPLICATION_CLASSPATH,
>   YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
>for (String c : applicationClassPathEntries) {
>   addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
>}
> }
> {code}
> Maybe we should place the user jars behind yarn application classpath when 
> `org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is 
> set to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar".
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn

2019-11-07 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969388#comment-16969388
 ] 

Victor Wong commented on FLINK-14626:
-

[~chesnay], thanks for your quick response. As an engineer providing 
company-wide stream processing capabilities for other teams, I have seen quite 
a few errors related to this issue. It's really prone to including Hadoop 
dependencies in the uber jar, especially when your job deals with Hadoop 
systems, e.g. HBase.

But I agree that it _would change existing behavior and could result in subtle 
bugs,_ so I will close this PR and find another way to solve the related 
problems.

> User jar packaged with hadoop dependencies may cause class conflit with 
> hadoop jars on yarn
> ---
>
> Key: FLINK-14626
> URL: https://issues.apache.org/jira/browse/FLINK-14626
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, the yarn application classpath is placed behind Flink classpath 
> (including user jars), which will cause conflict if the user jar accidentally 
> included hadoop dependencies.
> {code:java}
> // org.apache.flink.yarn.Utils#setupYarnClassPath
> public static void setupYarnClassPath(Configuration conf, Map 
> appMasterEnv) {
>addToEnvironment(
>   appMasterEnv,
>   Environment.CLASSPATH.name(),
>   appMasterEnv.get(ENV_FLINK_CLASSPATH));
>String[] applicationClassPathEntries = conf.getStrings(
>   YarnConfiguration.YARN_APPLICATION_CLASSPATH,
>   YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
>for (String c : applicationClassPathEntries) {
>   addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
>}
> }
> {code}
> Maybe we should place the user jars behind yarn application classpath when 
> `org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is 
> set to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar".
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values

2019-11-07 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969369#comment-16969369
 ] 

Victor Wong commented on FLINK-14642:
-

[~sewen], sure I will ping you once PR is ready:)

> Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
> 
>
> Key: FLINK-14642
> URL: https://issues.apache.org/jira/browse/FLINK-14642
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, TupleSerializer and CaseCassSerializer do not support serialize 
> NULL values, which I think is acceptable. But not supporting copy NULL values 
> will cause the following codes to throw an exception, which I think is not 
> matched with users' expectations and prone to error.
> *codes:*
> {code:java}
> stream.map(xxx).filter(_ != null).xxx //the return type of the map function 
> is Tuple and it may return null{code}
>  
> *exception info:*
>  
> {code:java}
> Caused by: java.lang.NullPointerException 
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
>  
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> {code}
>  
> *suggestion:*
> Can we make the `copy` method of TupleSerializer/CaseClassSerializer to 
> handle NULL values? e.g.
> {code:java}
> // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
> def copy(from: T): T = {
>   // handle NULL values.
>   if(from == null) {
> return from
>   }
>   initArray()
>   var i = 0
>   while (i < arity) {
> fields(i) = 
> fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
> i += 1
>   }
>   createInstance(fields)
> }
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values

2019-11-06 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-14642:

Description: 
Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL 
values, which I think is acceptable. But not supporting copy NULL values will 
cause the following codes to throw an exception, which I think is not matched 
with users' expectations and prone to error.

*codes:*
{code:java}
stream.map(xxx).filter(_ != null).xxx //the return type of the map function is 
Tuple and it may return null{code}
 

*exception info:*

 
{code:java}
Caused by: java.lang.NullPointerException 
  at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
 
  at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 
  at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
{code}
 

*suggestion:*

Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle 
NULL values? e.g.
{code:java}
// org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
def copy(from: T): T = {
  // handle NULL values.
  if(from == null) {
return from
  }
  initArray()
  var i = 0
  while (i < arity) {
fields(i) = 
fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
i += 1
  }
  createInstance(fields)
}
{code}
 

 

 

 

 

  was:
Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL 
values, which I think is acceptable. But not supporting to copy NULL values 
will cause the following codes to throw an exception, which I think is not 
matched with users' expectations.

*codes:*

 
{code:java}
stream.map(xxx).filter(_ != null).xxx //the return type of the map function is 
Tuple and it may return null{code}
 

*exception info:*

 
{code:java}
Caused by: java.lang.NullPointerExceptionCaused by: 
java.lang.NullPointerException at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
{code}
 

*suggestion:*

Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle 
NULL values? e.g.
{code:java}
// org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
def copy(from: T): T = {
  // handle NULL values.
  if(from == null) {
return from
  }
  initArray()
  var i = 0
  while (i < arity) {
fields(i) = 
fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
i += 1
  }
  createInstance(fields)
}
{code}
 

 

 

 

 


> Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values
> 
>
> Key: FLINK-14642
> URL: https://issues.apache.org/jira/browse/FLINK-14642
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.9.1
>Reporter: Victor Wong
>Priority: Major
>
> Currently, TupleSerializer and CaseCassSerializer do not support serialize 
> NULL values, which I think is acceptable. But not supporting copy NULL values 
> will cause the following codes to throw an exception, which I think is not 
> matched with users' expectations and prone to error.
> *codes:*
> {code:java}
> stream.map(xxx).filter(_ != null).xxx //the return type of the map function 
> is Tuple and it may return null{code}
>  
> *exception info:*
>  
> {code:java}
> Caused by: java.lang.NullPointerException 
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
>  
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> {code}
>  
> *suggestion:*
> Can we make the `copy` method of TupleSerializer/CaseClassSerializer to 
> handle NULL values? e.g.
> {code:java}
> // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
> def copy(from: T): T = {
>   // handle NULL values.
>   if(from == null) {
> return from
>   }
>   initArray()
>   var i = 0
>   while (i < arity) {
> fields(i) = 
> fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
> i += 1
>   }
>   createInstance(fields)
> }
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14642) Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values

2019-11-06 Thread Victor Wong (Jira)
Victor Wong created FLINK-14642:
---

 Summary: Flink TupleSerializer and CaseClassSerializer shoud 
support copy NULL values
 Key: FLINK-14642
 URL: https://issues.apache.org/jira/browse/FLINK-14642
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.9.1
Reporter: Victor Wong


Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL 
values, which I think is acceptable. But not supporting to copy NULL values 
will cause the following codes to throw an exception, which I think is not 
matched with users' expectations.

*codes:*

 
{code:java}
stream.map(xxx).filter(_ != null).xxx //the return type of the map function is 
Tuple and it may return null{code}
 

*exception info:*

 
{code:java}
Caused by: java.lang.NullPointerExceptionCaused by: 
java.lang.NullPointerException at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
{code}
 

*suggestion:*

Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle 
NULL values? e.g.
{code:java}
// org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy
def copy(from: T): T = {
  // handle NULL values.
  if(from == null) {
return from
  }
  initArray()
  var i = 0
  while (i < arity) {
fields(i) = 
fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef])
i += 1
  }
  createInstance(fields)
}
{code}
 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-12130) Apply command line options to configuration before installing security modules

2019-11-06 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong closed FLINK-12130.
---
Resolution: Won't Fix

> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-12648) Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()

2019-11-06 Thread Victor Wong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong closed FLINK-12648.
---
Resolution: Won't Fix

> Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()
> -
>
> Key: FLINK-12648
> URL: https://issues.apache.org/jira/browse/FLINK-12648
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> I think there are some duplicated codes in 
> _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in 
> apache hadoop-common dependency.
> We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, 
> org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes.
>  
> Replace
> {code:java}
> // -- (2) get the Hadoop file system class for that scheme
> final Class fsClass;
> try {
>fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, 
> hadoopConfig);
> }
> catch (IOException e) {
>throw new UnsupportedFileSystemSchemeException(
>  "Hadoop File System abstraction does not support scheme '" + scheme 
> + "'. " +
>"Either no file system implementation exists for that scheme, 
> " +
>"or the relevant classes are missing from the classpath.", e);
> }
> // -- (3) instantiate the Hadoop file system
> LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", 
> scheme, fsClass.getName());
> final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
> // -- (4) create the proper URI to initialize the file system
> final URI initUri;
> if (fsUri.getAuthority() != null) {
>initUri = fsUri;
> }
> else {
>LOG.debug("URI {} does not specify file system authority, trying to load 
> default authority (fs.defaultFS)");
>String configEntry = hadoopConfig.get("fs.defaultFS", null);
>if (configEntry == null) {
>   // fs.default.name deprecated as of hadoop 2.2.0 - see
>   // 
> http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
>   configEntry = hadoopConfig.get("fs.default.name", null);
>}
>if (LOG.isDebugEnabled()) {
>   LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
>}
>if (configEntry == null) {
>   throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
> "Hadoop configuration did not contain an entry for the default 
> file system ('fs.defaultFS').");
>}
>else {
>   try {
>  initUri = URI.create(configEntry);
>   }
>   catch (IllegalArgumentException e) {
>  throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
>"The configuration contains an invalid file system default 
> name " +
>"('fs.default.name' or 'fs.defaultFS'): " + configEntry);
>   }
>   if (initUri.getAuthority() == null) {
>  throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
>"Hadoop configuration for default file system 
> ('fs.default.name' or 'fs.defaultFS') " +
>"contains no valid authority component (like hdfs namenode, S3 
> host, etc)");
>   }
>}
> }
> // -- (5) configure the Hadoop file system
> try {
>hadoopFs.initialize(initUri, hadoopConfig);
> }
> catch (UnknownHostException e) {
>String message = "The Hadoop file system's authority (" + 
> initUri.getAuthority() +
>  "), specified by either the file URI or the configuration, cannot be 
> resolved.";
>throw new IOException(message, e);
> }
> {code}
> with
> {code:java}
> final org.apache.hadoop.fs.FileSystem hadoopFs = 
> org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14626) User jar packaged with hadoop dependencies may cause class conflit with hadoop jars on yarn

2019-11-05 Thread Victor Wong (Jira)
Victor Wong created FLINK-14626:
---

 Summary: User jar packaged with hadoop dependencies may cause 
class conflit with hadoop jars on yarn
 Key: FLINK-14626
 URL: https://issues.apache.org/jira/browse/FLINK-14626
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.9.1
Reporter: Victor Wong


Currently, the yarn application classpath is placed behind Flink classpath 
(including user jars), which will cause conflict if the user jar accidentally 
included hadoop dependencies.
{code:java}
// org.apache.flink.yarn.Utils#setupYarnClassPath
public static void setupYarnClassPath(Configuration conf, Map 
appMasterEnv) {
   addToEnvironment(
  appMasterEnv,
  Environment.CLASSPATH.name(),
  appMasterEnv.get(ENV_FLINK_CLASSPATH));
   String[] applicationClassPathEntries = conf.getStrings(
  YarnConfiguration.YARN_APPLICATION_CLASSPATH,
  YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
   for (String c : applicationClassPathEntries) {
  addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
   }
}
{code}
Maybe we should place the user jars behind yarn application classpath when 
`org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion` is set 
to LAST, like this "flink-xxx.jar:hadoop-xxx.jar:user-xxx.jar".

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13749) Make Flink client respect classloading policy

2019-11-05 Thread Victor Wong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967429#comment-16967429
 ] 

Victor Wong commented on FLINK-13749:
-

Any progress on this? If the Assignee is busy, I'd love to help:)

> Make Flink client respect classloading policy
> -
>
> Key: FLINK-13749
> URL: https://issues.apache.org/jira/browse/FLINK-13749
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client, Runtime / REST
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Minor
>
> Currently, Flink client does not respect the classloading policy and uses 
> hardcoded parent-first classloader, while the other components like 
> jobmanager and taskmanager use child-first classloader by default and respect 
> the classloading options. This makes the client more likely to have 
> dependency conflicts, especially after we removed the convenient hadoop 
> binaries (so users need to add hadoop classpath in the client classpath).
> So I propose to make Flink client's (including cli and rest handler) 
> classloading behavior aligned with the other components.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13586) Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 1.8.1

2019-08-06 Thread Victor Wong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16901096#comment-16901096
 ] 

Victor Wong commented on FLINK-13586:
-

I think it would be helpful to attach [mail 
discussion|https://lists.apache.org/thread.html/56f137a4c0b90eb94a73b7a2ab95453282293cf9c81107e9bedc658a@%3Cuser.flink.apache.org%3E]
 here.  

A user ran into `java.lang.NoSuchMethodError` when trying to submit an 
application packaged with flink 1.8.1 to flink cluster 1.8.0.  
{code:java}
...
Caused by: java.lang.NoSuchMethodError: 
org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Lorg/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel;Z)V
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:494)
...{code}

> Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 
> 1.8.1
> 
>
> Key: FLINK-13586
> URL: https://issues.apache.org/jira/browse/FLINK-13586
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.1
>Reporter: Gaël Renoux
>Priority: Major
>
> Method clean in org.apache.flink.api.java.ClosureCleaner received a new 
> parameter in Flink 1.8.1. This class is noted as internal, but is used in the 
> Kafka connectors (in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase).
> The Kafka connectors library is not provided by the server, and must be set 
> up as a dependency with compile scope (see 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#usage,
>  or the Maven project template). Any project using those connectors and built 
> with 1.8.0 cannot be deployed on a 1.8.1 Flink server, because it would 
> target the old method.
> => This methods needs a fallback with the original two arguments (setting a 
> default value of RECURSIVE for the level argument).



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-13586) Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 1.8.1

2019-08-05 Thread Victor Wong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16900142#comment-16900142
 ] 

Victor Wong commented on FLINK-13586:
-

Hi Gaël, could you assign this ticket to me, so I can come up with a PR 
following the suggestion in the ticket.

> Method ClosureCleaner.clean broke backward compatibility between 1.8.0 and 
> 1.8.1
> 
>
> Key: FLINK-13586
> URL: https://issues.apache.org/jira/browse/FLINK-13586
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.1
>Reporter: Gaël Renoux
>Priority: Major
>
> Method clean in org.apache.flink.api.java.ClosureCleaner received a new 
> parameter in Flink 1.8.1. This class is noted as internal, but is used in the 
> Kafka connectors (in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase).
> The Kafka connectors library is not provided by the server, and must be set 
> up as a dependency with compile scope (see 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#usage,
>  or the Maven project template). Any project using those connectors and built 
> with 1.8.0 cannot be deployed on a 1.8.1 Flink server, because it would 
> target the old method.
> => This methods needs a fallback with the original two arguments (setting a 
> default value of RECURSIVE for the level argument).



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-12646) Fix broken tests of RestClientTest

2019-06-07 Thread Victor Wong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859022#comment-16859022
 ] 

Victor Wong commented on FLINK-12646:
-

I'd love to, [https://github.com/apache/flink/pull/8663]

> Fix broken tests of RestClientTest
> --
>
> Key: FLINK-12646
> URL: https://issues.apache.org/jira/browse/FLINK-12646
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In
> {code:java}
> org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
> {code}
> , we use a "unroutableIp" with a value of  "10.255.255.1" for test.
> But sometimes this IP is reachable in a private network of a company, which 
> is the case for me. As a result, this test failed with a following exception: 
>  
> {code:java}
> java.lang.AssertionError: Expected: an instance of 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
>   Connection refused: /10.255.255.1:80> is a 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
>  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.junit.Assert.assertThat(Assert.java:956) at 
> org.junit.Assert.assertThat(Assert.java:923) at 
> org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
>  ...
> {code}
>  
>  
> Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
> which is described as  _Reserved for future use_ in 
> [wikipedia|https://en.wikipedia.org/wiki/Reserved_IP_addresses] 
> Or change the assertion? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12648) Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()

2019-05-28 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12648:

Summary: Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()  
(was: Load Hadoop file system via FileSystem.get())

> Load Hadoop file system via org.apache.hadoop.fs.FileSystem.get()
> -
>
> Key: FLINK-12648
> URL: https://issues.apache.org/jira/browse/FLINK-12648
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>
> I think there are some duplicated codes in 
> _org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in 
> apache hadoop-common dependency.
> We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, 
> org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes.
>  
> Replace
> {code:java}
> // -- (2) get the Hadoop file system class for that scheme
> final Class fsClass;
> try {
>fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, 
> hadoopConfig);
> }
> catch (IOException e) {
>throw new UnsupportedFileSystemSchemeException(
>  "Hadoop File System abstraction does not support scheme '" + scheme 
> + "'. " +
>"Either no file system implementation exists for that scheme, 
> " +
>"or the relevant classes are missing from the classpath.", e);
> }
> // -- (3) instantiate the Hadoop file system
> LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", 
> scheme, fsClass.getName());
> final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();
> // -- (4) create the proper URI to initialize the file system
> final URI initUri;
> if (fsUri.getAuthority() != null) {
>initUri = fsUri;
> }
> else {
>LOG.debug("URI {} does not specify file system authority, trying to load 
> default authority (fs.defaultFS)");
>String configEntry = hadoopConfig.get("fs.defaultFS", null);
>if (configEntry == null) {
>   // fs.default.name deprecated as of hadoop 2.2.0 - see
>   // 
> http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
>   configEntry = hadoopConfig.get("fs.default.name", null);
>}
>if (LOG.isDebugEnabled()) {
>   LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
>}
>if (configEntry == null) {
>   throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
> "Hadoop configuration did not contain an entry for the default 
> file system ('fs.defaultFS').");
>}
>else {
>   try {
>  initUri = URI.create(configEntry);
>   }
>   catch (IllegalArgumentException e) {
>  throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
>"The configuration contains an invalid file system default 
> name " +
>"('fs.default.name' or 'fs.defaultFS'): " + configEntry);
>   }
>   if (initUri.getAuthority() == null) {
>  throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
>"Hadoop configuration for default file system 
> ('fs.default.name' or 'fs.defaultFS') " +
>"contains no valid authority component (like hdfs namenode, S3 
> host, etc)");
>   }
>}
> }
> // -- (5) configure the Hadoop file system
> try {
>hadoopFs.initialize(initUri, hadoopConfig);
> }
> catch (UnknownHostException e) {
>String message = "The Hadoop file system's authority (" + 
> initUri.getAuthority() +
>  "), specified by either the file URI or the configuration, cannot be 
> resolved.";
>throw new IOException(message, e);
> }
> {code}
> with
> {code:java}
> final org.apache.hadoop.fs.FileSystem hadoopFs = 
> org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12648) Load Hadoop file system via FileSystem.get()

2019-05-28 Thread Victor Wong (JIRA)
Victor Wong created FLINK-12648:
---

 Summary: Load Hadoop file system via FileSystem.get()
 Key: FLINK-12648
 URL: https://issues.apache.org/jira/browse/FLINK-12648
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Victor Wong
Assignee: Victor Wong


I think there are some duplicated codes in 
_org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create_ with codes in apache 
hadoop-common dependency.

We can use _org.apache.hadoop.fs.FileSystem#get(java.net.URI, 
org.apache.hadoop.conf.Configuration)_ to remove the duplicated codes.

 

Replace
{code:java}
// -- (2) get the Hadoop file system class for that scheme

final Class fsClass;
try {
   fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, 
hadoopConfig);
}
catch (IOException e) {
   throw new UnsupportedFileSystemSchemeException(
 "Hadoop File System abstraction does not support scheme '" + scheme + 
"'. " +
   "Either no file system implementation exists for that scheme, " +
   "or the relevant classes are missing from the classpath.", e);
}

// -- (3) instantiate the Hadoop file system

LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", 
scheme, fsClass.getName());

final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance();

// -- (4) create the proper URI to initialize the file system

final URI initUri;
if (fsUri.getAuthority() != null) {
   initUri = fsUri;
}
else {
   LOG.debug("URI {} does not specify file system authority, trying to load 
default authority (fs.defaultFS)");

   String configEntry = hadoopConfig.get("fs.defaultFS", null);
   if (configEntry == null) {
  // fs.default.name deprecated as of hadoop 2.2.0 - see
  // 
http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
  configEntry = hadoopConfig.get("fs.default.name", null);
   }

   if (LOG.isDebugEnabled()) {
  LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry);
   }

   if (configEntry == null) {
  throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
"Hadoop configuration did not contain an entry for the default file 
system ('fs.defaultFS').");
   }
   else {
  try {
 initUri = URI.create(configEntry);
  }
  catch (IllegalArgumentException e) {
 throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
   "The configuration contains an invalid file system default name 
" +
   "('fs.default.name' or 'fs.defaultFS'): " + configEntry);
  }

  if (initUri.getAuthority() == null) {
 throw new IOException(getMissingAuthorityErrorPrefix(fsUri) +
   "Hadoop configuration for default file system ('fs.default.name' 
or 'fs.defaultFS') " +
   "contains no valid authority component (like hdfs namenode, S3 
host, etc)");
  }
   }
}

// -- (5) configure the Hadoop file system

try {
   hadoopFs.initialize(initUri, hadoopConfig);
}
catch (UnknownHostException e) {
   String message = "The Hadoop file system's authority (" + 
initUri.getAuthority() +
 "), specified by either the file URI or the configuration, cannot be 
resolved.";

   throw new IOException(message, e);
}
{code}
with
{code:java}
final org.apache.hadoop.fs.FileSystem hadoopFs = 
org.apache.hadoop.fs.FileSystem.get(fsUri, hadoopConfig);
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12646) Fix broken tests of RestClientTest

2019-05-28 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12646:

Description: 
In
{code:java}
org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
{code}
, we use a "unroutableIp" with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

 
{code:java}
java.lang.AssertionError: Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.junit.Assert.assertThat(Assert.java:956) at 
org.junit.Assert.assertThat(Assert.java:923) at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...
{code}
 

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as  _Reserved for future use_ in 
[wikipedia|https://en.wikipedia.org/wiki/Reserved_IP_addresses] 

Or change the assertion? 

  was:
In
{code:java}
org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
{code}
, we use a "unroutableIp" with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

 
{code:java}
java.lang.AssertionError: Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.junit.Assert.assertThat(Assert.java:956) at 
org.junit.Assert.assertThat(Assert.java:923) at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...
{code}
 

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as  _Reserved for future use_ in 
[wikipedia|[https://en.wikipedia.org/wiki/Reserved_IP_addresses]]

Or change the assertion? 


> Fix broken tests of RestClientTest
> --
>
> Key: FLINK-12646
> URL: https://issues.apache.org/jira/browse/FLINK-12646
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>
> In
> {code:java}
> org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
> {code}
> , we use a "unroutableIp" with a value of  "10.255.255.1" for test.
> But sometimes this IP is reachable in a private network of a company, which 
> is the case for me. As a result, this test failed with a following exception: 
>  
> {code:java}
> java.lang.AssertionError: Expected: an instance of 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
>   Connection refused: /10.255.255.1:80> is a 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
>  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.junit.Assert.assertThat(Assert.java:956) at 
> org.junit.Assert.assertThat(Assert.java:923) at 
> org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
>  ...
> {code}
>  
>  
> Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
> which is described as  _Reserved for future use_ in 
> [wikipedia|https://en.wikipedia.org/wiki/Reserved_IP_addresses] 
> Or change the assertion? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12646) Fix broken tests of RestClientTest

2019-05-28 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12646:

Description: 
In
{code:java}
org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
{code}
, we use a "unroutableIp" with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

 
{code:java}
java.lang.AssertionError: Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.junit.Assert.assertThat(Assert.java:956) at 
org.junit.Assert.assertThat(Assert.java:923) at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...
{code}
 

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as  _Reserved for future use_ in 
[wikipedia|[https://en.wikipedia.org/wiki/Reserved_IP_addresses]]

Or change the assertion? 

  was:
In
{code:java}
org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
{code}
, we use a "unroutableIp" with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

 
{code:java}
java.lang.AssertionError: Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.junit.Assert.assertThat(Assert.java:956) at 
org.junit.Assert.assertThat(Assert.java:923) at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...
{code}
 

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as  _Reserved for future use_ in 
([https://en.wikipedia.org/wiki/Reserved_IP_addresses])

Or change the assertion? 


> Fix broken tests of RestClientTest
> --
>
> Key: FLINK-12646
> URL: https://issues.apache.org/jira/browse/FLINK-12646
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>
> In
> {code:java}
> org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
> {code}
> , we use a "unroutableIp" with a value of  "10.255.255.1" for test.
> But sometimes this IP is reachable in a private network of a company, which 
> is the case for me. As a result, this test failed with a following exception: 
>  
> {code:java}
> java.lang.AssertionError: Expected: an instance of 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
>   Connection refused: /10.255.255.1:80> is a 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
>  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.junit.Assert.assertThat(Assert.java:956) at 
> org.junit.Assert.assertThat(Assert.java:923) at 
> org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
>  ...
> {code}
>  
>  
> Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
> which is described as  _Reserved for future use_ in 
> [wikipedia|[https://en.wikipedia.org/wiki/Reserved_IP_addresses]]
> Or change the assertion? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12646) Fix broken tests of RestClientTest

2019-05-28 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12646:

Description: 
In
{code:java}
org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
{code}
, we use a "unroutableIp" with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

 
{code:java}
java.lang.AssertionError: Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.junit.Assert.assertThat(Assert.java:956) at 
org.junit.Assert.assertThat(Assert.java:923) at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...
{code}
 

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as  _Reserved for future use_ in 
([https://en.wikipedia.org/wiki/Reserved_IP_addresses])

Or change the assertion? 

  was:
In
{code:java}
// code placeholder
{code}
`org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use a 
`unroutableIp` with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

```

java.lang.AssertionError: 
 Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException
 but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException

at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
 at org.junit.Assert.assertThat(Assert.java:956)
 at org.junit.Assert.assertThat(Assert.java:923)
 at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...

```

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as 

> Reserved for future use.

in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses])

Or change the assertion? 


> Fix broken tests of RestClientTest
> --
>
> Key: FLINK-12646
> URL: https://issues.apache.org/jira/browse/FLINK-12646
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>
> In
> {code:java}
> org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout
> {code}
> , we use a "unroutableIp" with a value of  "10.255.255.1" for test.
> But sometimes this IP is reachable in a private network of a company, which 
> is the case for me. As a result, this test failed with a following exception: 
>  
> {code:java}
> java.lang.AssertionError: Expected: an instance of 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException but: 
>   Connection refused: /10.255.255.1:80> is a 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
>  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.junit.Assert.assertThat(Assert.java:956) at 
> org.junit.Assert.assertThat(Assert.java:923) at 
> org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
>  ...
> {code}
>  
>  
> Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
> which is described as  _Reserved for future use_ in 
> ([https://en.wikipedia.org/wiki/Reserved_IP_addresses])
> Or change the assertion? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12646) Fix broken tests of RestClientTest

2019-05-28 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12646:

Description: 
In
{code:java}
// code placeholder
{code}
`org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use a 
`unroutableIp` with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

```

java.lang.AssertionError: 
 Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException
 but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException

at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
 at org.junit.Assert.assertThat(Assert.java:956)
 at org.junit.Assert.assertThat(Assert.java:923)
 at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...

```

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as 

> Reserved for future use.

in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses])

Or change the assertion? 

  was:
In `org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use 
a `unroutableIp` with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

```

java.lang.AssertionError: 
Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException
 but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException

at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
 at org.junit.Assert.assertThat(Assert.java:956)
 at org.junit.Assert.assertThat(Assert.java:923)
 at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...

```

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as 

> Reserved for future use.

in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses])

Or change the assertion? 


> Fix broken tests of RestClientTest
> --
>
> Key: FLINK-12646
> URL: https://issues.apache.org/jira/browse/FLINK-12646
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>
> In
> {code:java}
> // code placeholder
> {code}
> `org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use 
> a `unroutableIp` with a value of  "10.255.255.1" for test.
> But sometimes this IP is reachable in a private network of a company, which 
> is the case for me. As a result, this test failed with a following exception: 
> ```
> java.lang.AssertionError: 
>  Expected: an instance of 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException
>  but: 
>   Connection refused: /10.255.255.1:80> is a 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>  at org.junit.Assert.assertThat(Assert.java:956)
>  at org.junit.Assert.assertThat(Assert.java:923)
>  at 
> org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
>  ...
> ```
>  
> Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
> which is described as 
> > Reserved for future use.
> in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses])
> Or change the assertion? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12646) Fix broken tests of RestClientTest

2019-05-28 Thread Victor Wong (JIRA)
Victor Wong created FLINK-12646:
---

 Summary: Fix broken tests of RestClientTest
 Key: FLINK-12646
 URL: https://issues.apache.org/jira/browse/FLINK-12646
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Reporter: Victor Wong
Assignee: Victor Wong


In `org.apache.flink.runtime.rest.RestClientTest#testConnectionTimeout`, we use 
a `unroutableIp` with a value of  "10.255.255.1" for test.

But sometimes this IP is reachable in a private network of a company, which is 
the case for me. As a result, this test failed with a following exception: 

```

java.lang.AssertionError: 
Expected: an instance of 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException
 but: 
 is a 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException

at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
 at org.junit.Assert.assertThat(Assert.java:956)
 at org.junit.Assert.assertThat(Assert.java:923)
 at 
org.apache.flink.runtime.rest.RestClientTest.testConnectionTimeout(RestClientTest.java:76)
 ...

```

 

Can we change the `unroutableIp` to a reserved IP address, i.e "240.0.0.0", 
which is described as 

> Reserved for future use.

in [wikipedia]([https://en.wikipedia.org/wiki/Reserved_IP_addresses])

Or change the assertion? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jars

2019-05-09 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12459:

Description: 
When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
the user jars should come after all flink libs in runtime classpath, 
including "flink.jar".

But actually it's not:

org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
!image-2019-05-09-15-04-36-360.png!

I'm not sure if it's an expected behavior, because if a user forgets to mark 
Flink dependencies as "provided", it causes conflicts in detached mode. 

 

  was:
When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
the user jars should come after all flink libs in runtime classpath, 
including "flink.jar".

But actually it's not:

org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
!image-2019-05-09-15-04-36-360.png!

I'm not sure if it's an expected behavior, because if a user forgets to mark 
Flink dependencies as "provided", it causes conflicts in detached mode. 

Can we optimize it like this: 
[https://github.com/apache/flink/commit/86f264d76b3cfd89346f8c5ab2b8a9e99600aaee]


> YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of 
> classpath between user jars and flink jars
> --
>
> Key: FLINK-12459
> URL: https://issues.apache.org/jira/browse/FLINK-12459
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2019-05-09-15-04-36-360.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
> the user jars should come after all flink libs in runtime classpath, 
> including "flink.jar".
> But actually it's not:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
> !image-2019-05-09-15-04-36-360.png!
> I'm not sure if it's an expected behavior, because if a user forgets to mark 
> Flink dependencies as "provided", it causes conflicts in detached mode. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jars

2019-05-09 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12459:

Description: 
When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
the user jars should come after all flink libs in runtime classpath, 
including "flink.jar".

But actually it's not:

org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
!image-2019-05-09-15-04-36-360.png!

I'm not sure if it's an expected behavior, because if a user forgets to mark 
Flink dependencies as "provided", it causes conflicts in detached mode. 

Can we optimize it like this: 
[https://github.com/apache/flink/commit/86f264d76b3cfd89346f8c5ab2b8a9e99600aaee]

  was:
When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
the user jars should come after all flink libs in runtime classpath, 
including "flink.jar".

But actually it's not:

org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
!image-2019-05-09-15-04-36-360.png!

I'm not sure if it's an expected behavior, because if a user forgets to mark 
Flink dependencies as "provided", it causes conflicts in detached mode. 


> YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of 
> classpath between user jars and flink jars
> --
>
> Key: FLINK-12459
> URL: https://issues.apache.org/jira/browse/FLINK-12459
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
> Attachments: image-2019-05-09-15-04-36-360.png
>
>
> When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
> the user jars should come after all flink libs in runtime classpath, 
> including "flink.jar".
> But actually it's not:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
> !image-2019-05-09-15-04-36-360.png!
> I'm not sure if it's an expected behavior, because if a user forgets to mark 
> Flink dependencies as "provided", it causes conflicts in detached mode. 
> Can we optimize it like this: 
> [https://github.com/apache/flink/commit/86f264d76b3cfd89346f8c5ab2b8a9e99600aaee]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12472) Support setting attemptFailuresValidityInterval of jobs on Yarn

2019-05-09 Thread Victor Wong (JIRA)
Victor Wong created FLINK-12472:
---

 Summary: Support setting attemptFailuresValidityInterval of jobs 
on Yarn
 Key: FLINK-12472
 URL: https://issues.apache.org/jira/browse/FLINK-12472
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Reporter: Victor Wong
Assignee: Victor Wong


According to the documentation of 
[Yarn|http://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.html],
 a yarn application can set a _attemptFailuresValidityInterval_  to reset 
application attempts.

 

"attemptFailuresValidityInterval. _The default value is -1. when 
attemptFailuresValidityInterval in milliseconds is set to > 0, the failure 
number will no take failures which happen out of the validityInterval into 
failure count. If failure count reaches to maxAppAttempts, the application will 
be failed."_

 

We can make use of this feature to make Flink jobs on Yarn to be more 
long-running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12468) Unregister application from the YARN Resource Manager with a valid appTrackingUrl

2019-05-09 Thread Victor Wong (JIRA)
Victor Wong created FLINK-12468:
---

 Summary: Unregister application from the YARN Resource Manager 
with a valid appTrackingUrl
 Key: FLINK-12468
 URL: https://issues.apache.org/jira/browse/FLINK-12468
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Reporter: Victor Wong
Assignee: Victor Wong
 Attachments: image-2019-05-09-18-06-06-954.png, 
image-2019-05-09-18-07-52-725.png

Currently when a Flink job on yarn finished, it's tracking URL is not valid. As 
a result, we can not jump to Flink history server directly from Yarn.

!image-2019-05-09-18-06-06-954.png!

We can provide a valid appTrackingUrl when unregister from Yarn.

_org.apache.flink.yarn.YarnResourceManager#internalDeregisterApplication_

 

_!image-2019-05-09-18-07-52-725.png!_

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jars

2019-05-09 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12459:

Summary: YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the 
order of classpath between user jars and flink jars  (was: 
YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of 
classpath between user jars and flink jar)

> YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of 
> classpath between user jars and flink jars
> --
>
> Key: FLINK-12459
> URL: https://issues.apache.org/jira/browse/FLINK-12459
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
> Attachments: image-2019-05-09-15-04-36-360.png
>
>
> When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
> the user jars should come after all flink libs in runtime classpath, 
> including "flink.jar".
> But actually it's not:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
> !image-2019-05-09-15-04-36-360.png!
> I'm not sure if it's an expected behavior, because if a user forgets to mark 
> Flink dependencies as "provided", it causes conflicts in detached mode. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jar

2019-05-09 Thread Victor Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Wong updated FLINK-12459:

Description: 
When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
the user jars should come after all flink libs in runtime classpath, 
including "flink.jar".

But actually it's not:

org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
!image-2019-05-09-15-04-36-360.png!

I'm not sure if it's an expected behavior, because if a user forgets to mark 
Flink dependencies as "provided", it causes conflicts in detached mode. 

  was:
When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
the user jars should come after all flink libs, including "flink.jar".

But actually it's not:

!image-2019-05-09-15-04-36-360.png!

I'm not sure if it's an expected behavior, because if a user forgets to mark 
Flink dependencies as "provided", it causes conflicts. 


> YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of 
> classpath between user jars and flink jar
> -
>
> Key: FLINK-12459
> URL: https://issues.apache.org/jira/browse/FLINK-12459
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Minor
> Attachments: image-2019-05-09-15-04-36-360.png
>
>
> When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
> the user jars should come after all flink libs in runtime classpath, 
> including "flink.jar".
> But actually it's not:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor#startAppMaster 
> !image-2019-05-09-15-04-36-360.png!
> I'm not sure if it's an expected behavior, because if a user forgets to mark 
> Flink dependencies as "provided", it causes conflicts in detached mode. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12459) YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should affect the order of classpath between user jars and flink jar

2019-05-09 Thread Victor Wong (JIRA)
Victor Wong created FLINK-12459:
---

 Summary: YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR should 
affect the order of classpath between user jars and flink jar
 Key: FLINK-12459
 URL: https://issues.apache.org/jira/browse/FLINK-12459
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client
Reporter: Victor Wong
Assignee: Victor Wong
 Attachments: image-2019-05-09-15-04-36-360.png

When setting YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR to "LAST", I think 
the user jars should come after all flink libs, including "flink.jar".

But actually it's not:

!image-2019-05-09-15-04-36-360.png!

I'm not sure if it's an expected behavior, because if a user forgets to mark 
Flink dependencies as "provided", it causes conflicts. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12130) Apply command line options to configuration before installing security modules

2019-04-18 Thread Victor Wong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817050#comment-16817050
 ] 

Victor Wong edited comment on FLINK-12130 at 4/19/19 1:20 AM:
--

Hi [~aljoscha], I have created a PR with your advice, 

https://github.com/apache/flink/pull/8166

I tried to make the patch as simple as possible, but it seems like I "failed", 
because I have to change the behavior of 
`org.apache.flink.client.cli.CliFrontend#parseParameters` to return the correct 
`CommandLine` for different actions.

Maybe there is a better way, looking forward to your advice.


was (Author: victor-wong):
Hi [~aljoscha], I have created a PR with your advice, 
[https://github.com/apache/flink/pull/8166|https://github.com/apache/flink/pull/8166.]

I tried to make the patch as simple as possible, but it seems like I "failed", 
because I have to change the behavior of 
`org.apache.flink.client.cli.CliFrontend#parseParameters` to return the correct 
`CommandLine` for different actions.

Maybe there is a better way, looking forward to your advice.

> Apply command line options to configuration before installing security modules
> --
>
> Key: FLINK-12130
> URL: https://issues.apache.org/jira/browse/FLINK-12130
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Victor Wong
>Assignee: Victor Wong
>Priority: Major
>
> Currently if the user configures Kerberos credentials through command line, 
> it won't work.
> {code:java}
> // flink run -m yarn-cluster -yD 
> security.kerberos.login.keytab=/path/to/keytab -yD 
> security.kerberos.login.principal=xxx /path/to/test.jar
> {code}
> Above command would cause security failure if you do not have a ticket cache 
> w/ kinit.
> Maybe we could call 
> _org.apache.flink.client.cli.AbstractCustomCommandLine#applyCommandLineOptionsToConfiguration_
>   before _SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));_
> Here is a demo patch: 
> [https://github.com/jiasheng55/flink/commit/ef6880dba8a1f36849f5d1bb308405c421b29986]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >