[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5447 ---
[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5447#discussion_r167746607 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -591,16 +591,28 @@ public void collect(StreamRecord record) { operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); + ClassCastException replace; + if (outputTag != null) { + // Enrich error message + replace = new ClassCastException( + String.format( + "%s. Failed to push OutputTag with id '%s' to operator. " + + "This can occur when multiple OutputTags with different types " + + "but identical names are being used.", + e.getMessage(), + outputTag.getId())); + + throw new ExceptionInChainedOperatorException(replace); + } else { + replace = new ClassCastException( + String.format( + "%s. Failed to push OutputTag with id '%s' to operator. " + --- End diff -- Sorry for my misunderstanding. I'm unfamiliar with flink's business now. Thanks~ ---
[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5447#discussion_r167633643 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -591,16 +591,28 @@ public void collect(StreamRecord record) { operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); + ClassCastException replace; + if (outputTag != null) { + // Enrich error message + replace = new ClassCastException( + String.format( + "%s. Failed to push OutputTag with id '%s' to operator. " + + "This can occur when multiple OutputTags with different types " + + "but identical names are being used.", + e.getMessage(), + outputTag.getId())); + + throw new ExceptionInChainedOperatorException(replace); + } else { + replace = new ClassCastException( + String.format( + "%s. Failed to push OutputTag with id '%s' to operator. " + --- End diff -- Please properly read my comments. Jut replace the else block with `throw e`. If the OutputTag is null there's no point in modifying the error message. ---
[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5447#discussion_r167395827 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -591,16 +591,18 @@ public void collect(StreamRecord record) { operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); + if (outputTag != null) { --- End diff -- Thanks @zentol A good method! ---
[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5447#discussion_r167395295 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -591,16 +591,18 @@ public void collect(StreamRecord record) { operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); + if (outputTag != null) { --- End diff -- Neither. just add an else block that re-throws the original exception. ---
[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5447#discussion_r167393568 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -591,16 +591,18 @@ public void collect(StreamRecord record) { operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); + if (outputTag != null) { --- End diff -- Okay. So, should we try catch NPE or just output the null id ? ---
[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5447#discussion_r167392891 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -591,16 +591,18 @@ public void collect(StreamRecord record) { operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); + if (outputTag != null) { --- End diff -- you are now completely swallowing the exception if the outputtag is null which is unacceptable. ---
[GitHub] flink pull request #5447: [FLINK-8423] OperatorChain#pushToOperator catch bl...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/5447 [FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE ## What is the purpose of the change Fix the NPE when outputTag is null. ## Brief change log Add when outputTag is not null, then do the catch block work. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (/ no) - If yes, how is the feature documented? ( not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-8423 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5447.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5447 commit 65ac580ca5a3b4c78c954ff08eef9bedcb1e9713 Author: zhangminglei Date: 2018-02-10T02:32:43Z [FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE ---