[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...

2018-02-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5447


---


[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...

2018-02-12 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167746607
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,28 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   ClassCastException replace;
+   if (outputTag != null) {
+   // Enrich error message
+   replace = new ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
+   "This can occur 
when multiple OutputTags with different types " +
+   "but identical 
names are being used.",
+   e.getMessage(),
+   outputTag.getId()));
+
+   throw new 
ExceptionInChainedOperatorException(replace);
+   } else {
+   replace = new ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
--- End diff --

Sorry for my misunderstanding. I'm unfamiliar with flink's business now. 
Thanks~


---


[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...

2018-02-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167633643
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,28 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   ClassCastException replace;
+   if (outputTag != null) {
+   // Enrich error message
+   replace = new ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
+   "This can occur 
when multiple OutputTags with different types " +
+   "but identical 
names are being used.",
+   e.getMessage(),
+   outputTag.getId()));
+
+   throw new 
ExceptionInChainedOperatorException(replace);
+   } else {
+   replace = new ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
--- End diff --

Please properly read my comments. Jut replace the else block with `throw e`.

If the OutputTag is null there's no point in modifying the error message.


---


[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...

2018-02-10 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167395827
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,18 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   if (outputTag != null) {
--- End diff --

Thanks @zentol A good method!


---


[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...

2018-02-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167395295
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,18 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   if (outputTag != null) {
--- End diff --

Neither. just add an else block that re-throws the original exception.


---


[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...

2018-02-10 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167393568
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,18 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   if (outputTag != null) {
--- End diff --

Okay. So, should we try catch NPE or just output the null id ?


---


[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...

2018-02-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167392891
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,18 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   if (outputTag != null) {
--- End diff --

you are now completely swallowing the exception if the outputtag is null 
which is unacceptable.


---


[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...

2018-02-09 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5447

[FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE

## What is the purpose of the change
Fix the NPE when outputTag is null.


## Brief change log
Add when outputTag is not null, then do the catch block work.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (/ no)
  - If yes, how is the feature documented? ( not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8423

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5447.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5447


commit 65ac580ca5a3b4c78c954ff08eef9bedcb1e9713
Author: zhangminglei 
Date:   2018-02-10T02:32:43Z

[FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE




---