[jira] [Commented] (FLINK-7293) Support custom order by in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106877#comment-16106877 ] Dian Fu commented on FLINK-7293: As the {{event-time}}/{{process-time}} has higher priority than custom {{order by}}, so we can not first apply the custom sort and then pass it to the CEP library. {quote} This is the same case as in DataStream, which does not have sort function. {quote} Actually there are some differences. For example, there is no sort function in DataStream at all, so all the sort logic can be implemented in Table API. While there is already sort logic in CEP library (event time) which makes us can not implement the sort in Table API alone. Thoughts? > Support custom order by in PatternStream > > > Key: FLINK-7293 > URL: https://issues.apache.org/jira/browse/FLINK-7293 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, when {{ProcessingTime}} is configured, the events are fed to NFA > in the order of the arriving time and when {{EventTime}} is configured, the > events are fed to NFA in the order of the event time. It should also allow > custom {{order by}} to allow users to define the order of the events besides > the above factors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7293) Support custom order by in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106872#comment-16106872 ] Dawid Wysakowicz commented on FLINK-7293: - My personal feeling is that it is not needed in the CEP library itself and for me the described behaviour is unclear. If it is required by Table API, I would implement it on the Table API side.The way it can be done is just first apply that custom sort and after that pass it to the CEP library. This is the same case as in DataStream, which does not have {{sort}} function. > Support custom order by in PatternStream > > > Key: FLINK-7293 > URL: https://issues.apache.org/jira/browse/FLINK-7293 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, when {{ProcessingTime}} is configured, the events are fed to NFA > in the order of the arriving time and when {{EventTime}} is configured, the > events are fed to NFA in the order of the event time. It should also allow > custom {{order by}} to allow users to define the order of the events besides > the above factors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7292) Fix EMPTY MATCH bug in CEP.
[ https://issues.apache.org/jira/browse/FLINK-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-7292: Issue Type: Sub-task (was: New Feature) Parent: FLINK-6935 > Fix EMPTY MATCH bug in CEP. > --- > > Key: FLINK-7292 > URL: https://issues.apache.org/jira/browse/FLINK-7292 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: zhangxiaoyu > > Currently, with the pattern {quote}a? {quote}and the event{quote} a1{quote}, > the result pattern is only{quote} a1{quote}without the empty match. > We wish the empty matched is also returned. And I am working on this issue > now. > My method is checking if there exists empty match only when the the first > event comes(at the StartState) ——try to traverse the PROCEED edges with the > trueFunction condition from the StartState, see if it can arrive FinalState, > if so, add an empty list to the result. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7294) mesos.resourcemanager.framework.role not working
[ https://issues.apache.org/jira/browse/FLINK-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106860#comment-16106860 ] Bhumika Bayani commented on FLINK-7294: --- My Mesos version is 1.0.0 Also, i am using docker images to start flink cluster on mesos, and not installing the flink binaries on mesos-agents directly. Does that make any difference? Can someone please take a look and respond on this on priority? > mesos.resourcemanager.framework.role not working > > > Key: FLINK-7294 > URL: https://issues.apache.org/jira/browse/FLINK-7294 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.3.1 >Reporter: Bhumika Bayani >Priority: Critical > > I am using the above said setting in flink-conf.yaml > e.g. > mesos.resourcemanager.framework.role: mesos_role_tasks > I see a flink-scheduler registered in mesos/frameworks tab with above said > role. > But the scheduler fails to launch any tasks inspite of getting > resource-offers from mesos-agents with correct role. > The error seen is: > 2017-07-28 13:23:00,683 INFO > org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager - > Mesos task taskmanager-03768 failed, with a TaskManager in launch or > registration. State: TASK_ERROR Reason: REASON_TASK_INVALID (Task uses more > resources cpus(*):1; mem(*):1024; ports(*):[4006-4007] than available > cpus(mesos_role_tasks):7.4; mem(mesos_role_tasks):45876; > ports(mesos_role_tasks):[4002-4129, 4131-4380, 4382-4809, 4811-4957, > 4959-4966, 4968-4979, 4981-5049, 31000-31196, 31198-31431, 31433-31607, > 31609-32000]; ephemeral_storage(mesos_role_tasks):37662; > efs_storage(mesos_role_tasks):8.79609e+12; disk(mesos_role_tasks):5115) > The request is made for resources with * role. We do not have mesos running > anywhere with * role. Thus task manager never come up. > Am I missing any configuration? > I am using flink version 1.3.1 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7300) End-to-end tests are instable on Travis
Tzu-Li (Gordon) Tai created FLINK-7300: -- Summary: End-to-end tests are instable on Travis Key: FLINK-7300 URL: https://issues.apache.org/jira/browse/FLINK-7300 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.4.0 Reporter: Tzu-Li (Gordon) Tai Fix For: 1.4.0 It seems like the end-to-end tests are instable, causing the {{misc}} build profile to sporadically fail. Incorrect matched output: https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30&X-Amz-Date=20170731T060526Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8 Another failure example of a different cause then the above, also on the end-to-end tests: https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30&X-Amz-Date=20170731T060007Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6588) Rename NumberOfFullRestarts metric
[ https://issues.apache.org/jira/browse/FLINK-6588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106833#comment-16106833 ] ASF GitHub Bot commented on FLINK-6588: --- Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4292 > Rename NumberOfFullRestarts metric > -- > > Key: FLINK-6588 > URL: https://issues.apache.org/jira/browse/FLINK-6588 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Fang Yong > > The metric for the number of full restarts is currently called > {{fullRestarts}}. For clarity and consitency purposes I propose to rename it > to {{numFullRestarts}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4292: [FLINK-6588] Rename NumberOfFullRestarts metric
Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4292 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-7270) Add support for dynamic properties to cluster entry point
[ https://issues.apache.org/jira/browse/FLINK-7270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fang Yong reassigned FLINK-7270: Assignee: Fang Yong > Add support for dynamic properties to cluster entry point > - > > Key: FLINK-7270 > URL: https://issues.apache.org/jira/browse/FLINK-7270 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.3.1 >Reporter: Till Rohrmann >Assignee: Fang Yong >Priority: Minor > Labels: flip-6 > > We should respect dynamic properties when starting the {{ClusterEntrypoint}}. > This basically means extracting them from the passed command line arguments > and then adding the to the loaded {{Configuration}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7299) Write GenericRecord using AvroOutputFormat
[ https://issues.apache.org/jira/browse/FLINK-7299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106809#comment-16106809 ] ASF GitHub Bot commented on FLINK-7299: --- GitHub user soniclavier opened a pull request: https://github.com/apache/flink/pull/4422 [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat ## What is the purpose of the change Allow writing Avro GenericRecord using AvroOutputFormat. ## Brief change log - Added condition in AvroOutputFormat to check if avroValue is an instance of GenericRecord and create a GenericDatumWriter. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests- testGenericRecord() in AvroOutputFormatTest to write GenericRecords. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature?: no (not a major feature) - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/soniclavier/flink FLINK-7299-GenericRecord-in-AvroOutputFormat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4422 commit 1c71ca43bcd5d4733e581f80637b531ba447e9dc Author: Vishnu Viswanath Date: 2017-07-31T04:58:28Z [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat > Write GenericRecord using AvroOutputFormat > -- > > Key: FLINK-7299 > URL: https://issues.apache.org/jira/browse/FLINK-7299 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats >Reporter: Vishnu Viswanath >Assignee: Vishnu Viswanath >Priority: Minor > > Allow AvroOutputFormat to write GenericRecords -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroO...
GitHub user soniclavier opened a pull request: https://github.com/apache/flink/pull/4422 [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat ## What is the purpose of the change Allow writing Avro GenericRecord using AvroOutputFormat. ## Brief change log - Added condition in AvroOutputFormat to check if avroValue is an instance of GenericRecord and create a GenericDatumWriter. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests- testGenericRecord() in AvroOutputFormatTest to write GenericRecords. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature?: no (not a major feature) - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/soniclavier/flink FLINK-7299-GenericRecord-in-AvroOutputFormat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4422 commit 1c71ca43bcd5d4733e581f80637b531ba447e9dc Author: Vishnu Viswanath Date: 2017-07-31T04:58:28Z [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7299) Write GenericRecord using AvroOutputFormat
Vishnu Viswanath created FLINK-7299: --- Summary: Write GenericRecord using AvroOutputFormat Key: FLINK-7299 URL: https://issues.apache.org/jira/browse/FLINK-7299 Project: Flink Issue Type: Improvement Components: Batch Connectors and Input/Output Formats Reporter: Vishnu Viswanath Assignee: Vishnu Viswanath Priority: Minor Allow AvroOutputFormat to write GenericRecords -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7298) Records can be cleared when all data in state is invalid
[ https://issues.apache.org/jira/browse/FLINK-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106797#comment-16106797 ] ASF GitHub Bot commented on FLINK-7298: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/4421 Also, I think we need to remove `!validTimestamp` from `while (keyIter.hasNext && !validTimestamp) {` . Because, new records may be inserted at the head of rowMapState, in this case expired data will always remain in the rowMapState. > Records can be cleared when all data in state is invalid > > > Key: FLINK-7298 > URL: https://issues.apache.org/jira/browse/FLINK-7298 > Project: Flink > Issue Type: Improvement >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove > records from state when there is no valid records. Instead, we can clear them > all at once. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4421: [FLINK-7298] [table] Records can be cleared all at once w...
Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/4421 Also, I think we need to remove `!validTimestamp` from `while (keyIter.hasNext && !validTimestamp) {` . Because, new records may be inserted at the head of rowMapState, in this case expired data will always remain in the rowMapState. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7298) Records can be cleared when all data in state is invalid
[ https://issues.apache.org/jira/browse/FLINK-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106794#comment-16106794 ] ASF GitHub Bot commented on FLINK-7298: --- GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/4421 [FLINK-7298] [table] Records can be cleared all at once when all data in state is invalid In `ProcTimeWindowInnerJoin`.`expireOutTimeRow`, we need not to remove records one by one from state when there is no valid records. Instead, we can clear them all at once. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink 7298 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4421.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4421 commit 0d2641c13fa5e9aafca16c0fbdc87fab4abc05ec Author: 军长 Date: 2017-07-31T03:14:06Z [FLINK-7298] [table] Records can be cleared when all data in state is invalid > Records can be cleared when all data in state is invalid > > > Key: FLINK-7298 > URL: https://issues.apache.org/jira/browse/FLINK-7298 > Project: Flink > Issue Type: Improvement >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove > records from state when there is no valid records. Instead, we can clear them > all at once. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4421: [FLINK-7298] [table] Records can be cleared all at...
GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/4421 [FLINK-7298] [table] Records can be cleared all at once when all data in state is invalid In `ProcTimeWindowInnerJoin`.`expireOutTimeRow`, we need not to remove records one by one from state when there is no valid records. Instead, we can clear them all at once. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink 7298 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4421.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4421 commit 0d2641c13fa5e9aafca16c0fbdc87fab4abc05ec Author: åé¿ Date: 2017-07-31T03:14:06Z [FLINK-7298] [table] Records can be cleared when all data in state is invalid --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4415: [FLINK-7269] Refactor passing of dynamic properties
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4415 @tillrohrmann Thank you for your suggestion, I have update the PR template and add test caseï¼ thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7269) Refactor passing of dynamic properties
[ https://issues.apache.org/jira/browse/FLINK-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106783#comment-16106783 ] ASF GitHub Bot commented on FLINK-7269: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4415 @tillrohrmann Thank you for your suggestion, I have update the PR template and add test case, thanks > Refactor passing of dynamic properties > -- > > Key: FLINK-7269 > URL: https://issues.apache.org/jira/browse/FLINK-7269 > Project: Flink > Issue Type: Improvement > Components: Configuration >Affects Versions: 1.3.1 >Reporter: Till Rohrmann >Assignee: Fang Yong > > In order to set dynamic properties when loading the {{Configuration}} via > {{GlobalConfiguration.loadConfiguration}}, we currently set a static field in > {{GlobalConfiguration}} which is read whenever we load the {{Configuration}}. > I think this is not a good pattern I propose to remove this functionality. > Instead we should explicitly add the dynamic properties to the loaded > {{Configuration}} at start of the application. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106770#comment-16106770 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130264936 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && --- End diff -- Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare state name. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106772#comment-16106772 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130264164 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { --- End diff -- Should also consider the situation **Proceed to Final state**. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106769#comment-16106769 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130266394 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); + resultingComputationStates.addAll(computationStates); + } else if (previousState == null && currentState.getName().equals(skipStrategy.getPatternName())) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + break; + case SKIP_TO_LAST: + if (currentState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); + resultingComputationStates.addAll(computationStates); + } + break; + } break; } } - if (computationState.isStartState()) { - int totalBranches = calculateIncreasingSelfState( - outgoingEdges.getTotalIgnoreBranches(), - outgoingEdges.getTotalTakeBranches()); - - DeweyNumber startVersion = computationState.getVersion().increase(totalBranches); - ComputationState startState = ComputationState.createStartState(this, computationState.getState(), startVersion); - resultingComputationSt
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106771#comment-16106771 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130265354 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { --- End diff -- Why need the callLevel? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130264164 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { --- End diff -- Should also consider the situation **Proceed to Final state**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130266394 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); + resultingComputationStates.addAll(computationStates); + } else if (previousState == null && currentState.getName().equals(skipStrategy.getPatternName())) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + break; + case SKIP_TO_LAST: + if (currentState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); + resultingComputationStates.addAll(computationStates); + } + break; + } break; } } - if (computationState.isStartState()) { - int totalBranches = calculateIncreasingSelfState( - outgoingEdges.getTotalIgnoreBranches(), - outgoingEdges.getTotalTakeBranches()); - - DeweyNumber startVersion = computationState.getVersion().increase(totalBranches); - ComputationState startState = ComputationState.createStartState(this, computationState.getState(), startVersion); - resultingComputationStates.add(startState); + if (computationState.isStartState() && + skipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT) { --- End diff -- If the skip strategy is @**SKIP_PAST
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130264936 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && --- End diff -- Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare state name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130265354 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { --- End diff -- Why need the callLevel? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7298) Records can be cleared when all data in state is invalid
[ https://issues.apache.org/jira/browse/FLINK-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng updated FLINK-7298: --- Description: In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove records from state when there is no valid records. Instead, we can clear them all at once. (was: In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove records from state when there is no valid records. Instead, we can clear them all at once.) > Records can be cleared when all data in state is invalid > > > Key: FLINK-7298 > URL: https://issues.apache.org/jira/browse/FLINK-7298 > Project: Flink > Issue Type: Improvement >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove > records from state when there is no valid records. Instead, we can clear them > all at once. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7298) Records can be cleared when all data in state is invalid
[ https://issues.apache.org/jira/browse/FLINK-7298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-7298: -- Assignee: Hequn Cheng > Records can be cleared when all data in state is invalid > > > Key: FLINK-7298 > URL: https://issues.apache.org/jira/browse/FLINK-7298 > Project: Flink > Issue Type: Improvement >Reporter: Hequn Cheng >Assignee: Hequn Cheng > > In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove > records from state when there is no valid records. Instead, we can clear > them all at once. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7298) Records can be cleared when all data in state is invalid
Hequn Cheng created FLINK-7298: -- Summary: Records can be cleared when all data in state is invalid Key: FLINK-7298 URL: https://issues.apache.org/jira/browse/FLINK-7298 Project: Flink Issue Type: Improvement Reporter: Hequn Cheng In {{ProcTimeWindowInnerJoin}}.{{expireOutTimeRow}}, we need not to remove records from state when there is no valid records. Instead, we can clear them all at once. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106728#comment-16106728 ] ASF GitHub Bot commented on FLINK-6493: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4328 Thanks to @tedyu . It is a good idea and code have been updated based on your suggestion. Please check it out again ~ > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4328: [FLINK-6493] Fix ineffective null check in RegisteredOper...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4328 Thanks to @tedyu . It is a good idea and code have been updated based on your suggestion. Please check it out again ~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7239) GroupWindow and OverWindow aggregates do not check correctly for updating input
[ https://issues.apache.org/jira/browse/FLINK-7239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106724#comment-16106724 ] Hequn Cheng commented on FLINK-7239: hi, [~fhueske] RetractionRules will make sure that {{DataStreamGroupWindowAggregate}} and {{DataStreamOverAggregate}} can not receive purely update messages(updates without retraction). If they did, IMO, we may need to fix the RetractionRules, not the other way around. As for the {{DataStreamWindowJoin}}, we need to set needsUpdatesAsRetraction to true considering that it can not handle purely update messages and check that the input does not have retraction messages. : ) > GroupWindow and OverWindow aggregates do not check correctly for updating > input > --- > > Key: FLINK-7239 > URL: https://issues.apache.org/jira/browse/FLINK-7239 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1, 1.4.0 >Reporter: Fabian Hueske > > The streaming table operators for group-window and over-window aggregates do > only support append-only input, i.e., no updates. > However, the {{DataStreamGroupWindowAggregate}} {{DataStreamOverAggregate}} > RelNodes do only check that the input does not have retraction messages which > is only one possible encoding of updates. > We should fix the RelNodes to check if the input is append-only using the > {{UpdatingPlanChecker}} similar to {{DataStreamWindowJoin}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106672#comment-16106672 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4353 BTW, one alternative I was once considering for the scale down case is merging state handles that are backed by different physical files in one logical state handle, using something based on `MultiStreamStateHandle`. That would require minor changes in how the backends currently iterate the handles and some calculation of virtual offsets near the `StateAssignmentOperation`, mapping the old physical file offsets to the new logical offsets in the stream that gives a consecutive logical view over the files. Then, the whole code would never again deal with this detail. Wonder if this is worth the effort? > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4353 BTW, one alternative I was once considering for the scale down case is merging state handles that are backed by different physical files in one logical state handle, using something based on `MultiStreamStateHandle`. That would require minor changes in how the backends currently iterate the handles and some calculation of virtual offsets near the `StateAssignmentOperation`, mapping the old physical file offsets to the new logical offsets in the stream that gives a consecutive logical view over the files. Then, the whole code would never again deal with this detail. Wonder if this is worth the effort? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7213) Introduce state management by OperatorID in TaskManager
[ https://issues.apache.org/jira/browse/FLINK-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106669#comment-16106669 ] ASF GitHub Bot commented on FLINK-7213: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4353 I think that idea is problematic because in the rescaling case, all the collections can have different sizes. For example there can be 5 managed keyed state handles and 7 managed operator state handles and zero state handles for the raw state. Then how would you split that up between the `OperatorSubtaskStates` in your set? Like this, `OperatorSubtaskState` contains the complete state for an operator subtask which I think is a good thing. Also maybe at some point there *might* be a reason to report more than one handle already on snapshotting. > Introduce state management by OperatorID in TaskManager > --- > > Key: FLINK-7213 > URL: https://issues.apache.org/jira/browse/FLINK-7213 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > > Flink-5892 introduced the job manager / checkpoint coordinator part of > managing state on the operator level instead of the task level by introducing > explicit operator_id -> state mappings. However, this explicit mapping was > not introduced in the task manager side, so the explicit mapping is still > converted into a mapping that suits the implicit operator chain order. > We should also introduce explicit operator ids to state management on the > task manager. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4353: [FLINK-7213] Introduce state management by OperatorID in ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4353 I think that idea is problematic because in the rescaling case, all the collections can have different sizes. For example there can be 5 managed keyed state handles and 7 managed operator state handles and zero state handles for the raw state. Then how would you split that up between the `OperatorSubtaskStates` in your set? Like this, `OperatorSubtaskState` contains the complete state for an operator subtask which I think is a good thing. Also maybe at some point there *might* be a reason to report more than one handle already on snapshotting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-4534: -- Description: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} was: Iteration over state.bucketStates is protected by synchronization in other methods, except for the following in restoreState(): {code} for (BucketState bucketState : state.bucketStates.values()) { {code} and following in close(): {code} for (Map.Entry> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); {code} w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting line 752: {code} Set pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); LOG.debug("Moving pending files to final location on restore."); for (Long pastCheckpointId : pastCheckpointIds) { {code} > Lack of synchronization in BucketingSink#restoreState() > --- > > Key: FLINK-4534 > URL: https://issues.apache.org/jira/browse/FLINK-4534 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu > > Iteration over state.bucketStates is protected by synchronization in other > methods, except for the following in restoreState(): > {code} > for (BucketState bucketState : state.bucketStates.values()) { > {code} > and following in close(): > {code} > for (Map.Entry> entry : > state.bucketStates.entrySet()) { > closeCurrentPartFile(entry.getValue()); > {code} > w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue > starting line 752: > {code} > Set pastCheckpointIds = > bucketState.pendingFilesPerCheckpoint.keySet(); > LOG.debug("Moving pending files to final location on restore."); > for (Long pastCheckpointId : pastCheckpointIds) { > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5486: -- Description: Here is related code: {code} handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.clear(); } {code} The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside the synchronization block. Otherwise during the processing of handlePendingFilesForPreviousCheckpoints(), some entries of the map may be cleared. was: Here is related code: {code} handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.clear(); } {code} The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside the synchronization block. Otherwise during the processing of handlePendingFilesForPreviousCheckpoints(), some entries of the map may be cleared. > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.2, 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7115) test instability in MiniClusterITCase.runJobWithMultipleJobManagers
[ https://issues.apache.org/jira/browse/FLINK-7115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7115. -- Resolution: Fixed Fix Version/s: 1.4.0 Closing this issue for the moment because it should have been fixed by FLINK-7279. If not, then we have to reopen this issue. > test instability in MiniClusterITCase.runJobWithMultipleJobManagers > --- > > Key: FLINK-7115 > URL: https://issues.apache.org/jira/browse/FLINK-7115 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber > Labels: test-stability > Fix For: 1.4.0 > > > In a test run with unrelated changes, I to have one case of > {{MiniClusterITCase}} hanging: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/250775808/log.txt?X-Amz-Expires=30&X-Amz-Date=20170706T151556Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170706/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=5b7c512137c7cbd82dcb77a98aeadc3d761b7055bea6d8f07ad6b786dc196f37 > {code} > == > Maven produced no output for 300 seconds. > == > == > The following Java processes are running (JPS) > == > 12852 Jps > 9166 surefirebooter1705381973603203163.jar > 4966 Launcher > == > Printing stack trace of Java process 12865 > == > 12865: No such process > == > Printing stack trace of Java process 9166 > == > 2017-07-06 15:05:52 > Full thread dump OpenJDK 64-Bit Server VM (24.131-b00 mixed mode): > "Attach Listener" daemon prio=10 tid=0x7fc520b1a000 nid=0x3266 waiting on > condition [0x] >java.lang.Thread.State: RUNNABLE > "flink-akka.actor.default-dispatcher-9" daemon prio=10 tid=0x7fc520b0e800 > nid=0x23fd waiting on condition [0x7fc51abcb000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007a0ca2c78> (a > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) > at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > "flink-akka.actor.default-dispatcher-8" daemon prio=10 tid=0x7fc520bb9800 > nid=0x23fc waiting on condition [0x7fc51aaca000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007a0ca2c78> (a > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) > at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > "Flink-MetricRegistry-1" prio=10 tid=0x7fc520ba7800 nid=0x23f9 waiting on > condition [0x7fc51a4c4000] >java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007a09699c8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1092) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > "flink-akka.actor.default-dispatcher-7" daemon prio=10 tid=0x7fc520b9d800 > nid=0x23f7 waiting on condition [0x7fc51a6c6000] >java.lang.Thread.State: WAITING (parking) >
[GitHub] flink pull request #4416: [FLINK-7279][minicluster] fix a deadlock between T...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4416 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-7279) MiniCluster can deadlock at shut down
[ https://issues.apache.org/jira/browse/FLINK-7279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7279. -- Resolution: Fixed Fix Version/s: 1.4.0 Fixed via 49acd09ec19b5da3f6e9861d658728ec1306e3d5 > MiniCluster can deadlock at shut down > - > > Key: FLINK-7279 > URL: https://issues.apache.org/jira/browse/FLINK-7279 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Nico Kruber > Labels: flip-6 > Fix For: 1.4.0 > > > The {{MiniCluster}} can deadlock in case if the fatal error handler is called > while the {{MiniCluster}} shuts down. The reason is that the shut down > happens under a lock which is required by the fatal error handler as well. If > now the {{MiniCluster}} tries to shut down the underlying RPC service which > waits for all actors to terminate, it will never complete because one actor > is still waiting for the lock. > One solution would be to ignore the fatal error handler calls if the > {{MiniCluster}} is shutting down. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/257811319/log.txt -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7279) MiniCluster can deadlock at shut down
[ https://issues.apache.org/jira/browse/FLINK-7279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106586#comment-16106586 ] ASF GitHub Bot commented on FLINK-7279: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4416 > MiniCluster can deadlock at shut down > - > > Key: FLINK-7279 > URL: https://issues.apache.org/jira/browse/FLINK-7279 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Nico Kruber > Labels: flip-6 > Fix For: 1.4.0 > > > The {{MiniCluster}} can deadlock in case if the fatal error handler is called > while the {{MiniCluster}} shuts down. The reason is that the shut down > happens under a lock which is required by the fatal error handler as well. If > now the {{MiniCluster}} tries to shut down the underlying RPC service which > waits for all actors to terminate, it will never complete because one actor > is still waiting for the lock. > One solution would be to ignore the fatal error handler calls if the > {{MiniCluster}} is shutting down. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/257811319/log.txt -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7080) Deploy Yarn cluster with job
[ https://issues.apache.org/jira/browse/FLINK-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106581#comment-16106581 ] ASF GitHub Bot commented on FLINK-7080: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4284 > Deploy Yarn cluster with job > > > Key: FLINK-7080 > URL: https://issues.apache.org/jira/browse/FLINK-7080 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, YARN >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > In order to start a yarn per-job cluster, we have to start a Yarn application > running Flink which includes the job to be executed. One way to do it is to > upload the serialized form of the {{JobGraph}} as a file. The Yarn entry > point can then read the {{JobGraph}} from this file and start the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7080) Deploy Yarn cluster with job
[ https://issues.apache.org/jira/browse/FLINK-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7080. Resolution: Done Fix Version/s: 1.4.0 Added via a954ea113bc29a4480af579387c6e9b81bd76f85 > Deploy Yarn cluster with job > > > Key: FLINK-7080 > URL: https://issues.apache.org/jira/browse/FLINK-7080 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, YARN >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > In order to start a yarn per-job cluster, we have to start a Yarn application > running Flink which includes the job to be executed. One way to do it is to > upload the serialized form of the {{JobGraph}} as a file. The Yarn entry > point can then read the {{JobGraph}} from this file and start the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4284: [FLINK-7080] [yarn] Add Yarn per job mode deployme...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4284 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7080) Deploy Yarn cluster with job
[ https://issues.apache.org/jira/browse/FLINK-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106578#comment-16106578 ] ASF GitHub Bot commented on FLINK-7080: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4284 Travis passed. Merging this PR. > Deploy Yarn cluster with job > > > Key: FLINK-7080 > URL: https://issues.apache.org/jira/browse/FLINK-7080 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, YARN >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to start a yarn per-job cluster, we have to start a Yarn application > running Flink which includes the job to be executed. One way to do it is to > upload the serialized form of the {{JobGraph}} as a file. The Yarn entry > point can then read the {{JobGraph}} from this file and start the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4284: [FLINK-7080] [yarn] Add Yarn per job mode deployment
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4284 Travis passed. Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4380 Hi @rtudoran, I had the impression that your main motivation was to fill the syntactic gaps of SQL on streams and not so much to support certain kinds of use cases. The semantics of a SQL query are given by the SQL specification and not up for discussion. If you want to support a certain behavior, we need to see how this can be expressed in standard-compliant SQL and then think about the implementation. The semantics of `ORDER BY rowtime ASC FETCH 10 ROWS ONLY` are given and will return the ten *first* (first because they have the lowest timestamp) result rows. However, I think what you are trying to achieve is represented by `ORDER BY rowtime DESC FETCH 10 ROWS ONLY`. This query returns the ten *last* (last because they have the highest timestamp) rows of the result. Obviously, we need retraction to handle this case, because the ten last rows will change over time. `ORDER BY rowtime DESC OFFSET 10` is also not very useful, because it holds back the 10 last result rows. However, we could support it to fill the gap in the syntax and execute the query instead of saying "Sorry, can't do that". So the question now is, how do we proceed with this PR? Do we want to add the `ORDER BY rowtime ASC OFFSET / FETCH` functionality as simple additions to the existing operators (as I said I think it can be done with very few lines if we do some refactoring) to fill the syntactic gap or not? Regardless of whether we add this or not, we should work on the `ORDER BY rowtime DESC OFFEST / FETCH` case. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106399#comment-16106399 ] Xingcan Cui commented on FLINK-7245: Thanks for the answer and the suggestion, [~fhueske]. I'll keep the focus on it. > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106397#comment-16106397 ] Fabian Hueske commented on FLINK-7245: -- Hi [~xccui], thanks for the questions. I think you got it mostly right. I would propose to continue this discussion on a separate JIRA about changing the handling of time indicators. This JIRA is about holding back watermarks which is not only useful for the Table API / SQL. I will open a JIRA to change the time indicator handling soon. Thanks, Fabian > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7179) ProjectableTableSource interface doesn't compatible with BoundedOutOfOrdernessTimestampExtractor
[ https://issues.apache.org/jira/browse/FLINK-7179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106389#comment-16106389 ] Fabian Hueske commented on FLINK-7179: -- I think a {{ProjectableTableSource}} knows that is provides a virtual column time indicator column. It should be able to identify that the field index points to a virtual attribute. So, I think it should be possible to handle this case correctly, but I agree that this is not easy and requires knowledge of how things work internally. We are currently discussing to change the handling and representation of time indicators. In a nutshell, we are thinking of treating them as regular Row fields and not as virtual columns anymore. See this [thread|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] for some background info. I am not sure what we can do about the situation right now or if it is worth to invest a lot of time into it if the time indicator representation is changed anyway. What do you think [~suez1224], [~hpeter]? > ProjectableTableSource interface doesn't compatible with > BoundedOutOfOrdernessTimestampExtractor > > > Key: FLINK-7179 > URL: https://issues.apache.org/jira/browse/FLINK-7179 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang > > In the implementation of window of stream sql, > BoundedOutOfOrdernessTimestampExtractor is designed to extract row time from > each row. It assumes the ts field is in the data stream by default. On the > other hand, ProjectableTableSource is designed to help projection push down. > If there is no row time related field in a query, the extractor can't > function well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7291) Incorrect return value of getArity() in ListTypeInfo and MapTypeInfo
[ https://issues.apache.org/jira/browse/FLINK-7291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106367#comment-16106367 ] Fabian Hueske commented on FLINK-7291: -- I think you are right that {{getArity()}} was introduced for composite types (not sure why it wasn't added to CompositeType). So, this is probably not an issue, because the method is (hopefully) only called for composite types. However, IIRC the method should return the number of record fields a type has (without nesting). All primitive types and array types implement the method by returning 1. So, we should go for consistency here, IMO. The {{getTotalFields()}} method is a bigger issue because it is recursively called by composite types on their field types (composite or not) to compute the number of nested fields (e.g., in the constructor of PojoTypeInfo or TupleTypeInfoBase). > Incorrect return value of getArity() in ListTypeInfo and MapTypeInfo > > > Key: FLINK-7291 > URL: https://issues.apache.org/jira/browse/FLINK-7291 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.1, 1.4.0 >Reporter: Fabian Hueske > > The implementation of {{getArity()}} in {{ListTypeInfo}} and {{MapTypeInfo}} > returns 0 but should return 1. > The implementation of {{MapTypeInfo.getTotalFields()}} is also incorrect and > must return 1 instead of 2. > The JavaDocs of {{TypeInformation.getArity()}} and > {{TypeInformation.getTotalFields()}} should be extended as well to avoid > future confusion about this method. The method returns the arity for current > type. For atomic types, this is 1 for composite types, the number of > non-nested fields (or number of fields on the first nesting-level). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6333) Utilize Bloomfilters in RocksDb
[ https://issues.apache.org/jira/browse/FLINK-6333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095490#comment-16095490 ] Ted Yu edited comment on FLINK-6333 at 7/30/17 7:33 AM: Looks like rocksdb issue 1964 can be closed . was (Author: yuzhih...@gmail.com): Looks like rocksdb issue 1964 can be closed. > Utilize Bloomfilters in RocksDb > --- > > Key: FLINK-6333 > URL: https://issues.apache.org/jira/browse/FLINK-6333 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ted Yu > > Bloom Filters would speed up RocksDb lookups. > When we upgrade to RocksDb 5.2.1+, we would be able to do: > {code} > new BlockBasedTableConfig() > .setBlockCacheSize(blockCacheSize) > .setBlockSize(blockSize) > .setFilter(new BloomFilter()) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106355#comment-16106355 ] ASF GitHub Bot commented on FLINK-6493: --- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r130235370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +175,21 @@ public boolean equals(Object obj) { return false; } + Snapshot snapshot; + + if (obj instanceof Snapshot) { + snapshot = (Snapshot)obj; + } else { --- End diff -- nit: if you place the condition for else first, declaration and assignment to snapshot can be done in one line. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r130235370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +175,21 @@ public boolean equals(Object obj) { return false; } + Snapshot snapshot; + + if (obj instanceof Snapshot) { + snapshot = (Snapshot)obj; + } else { --- End diff -- nit: if you place the condition for else first, declaration and assignment to snapshot can be done in one line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6838) RescalingITCase fails in master branch
[ https://issues.apache.org/jira/browse/FLINK-6838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-6838: -- Description: {code} Tests in error: RescalingITCase.testSavepointRescalingInKeyedState[1] » JobExecution Job execu... RescalingITCase.testSavepointRescalingWithKeyedAndNonPartitionedState[1] » JobExecution {code} Both failed with similar cause: {code} testSavepointRescalingInKeyedState[1](org.apache.flink.test.checkpointing.RescalingITCase) Time elapsed: 4.813 sec <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 4 for operator Flat Map -> Sink: Unnamed (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:967) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator Flat Map -> Sink: Unnamed (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:967) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Cannot register Closeable, registry is already closed. Closing argument. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Cannot register Closeable, registry is already closed. Closing argument. at org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:66) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.openCheckpointStream(RocksDBKeyedStateBackend.java:495) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.openIOHandle(RocksDBKeyedStateBackend.java:394) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.openIOHandle(RocksDBKeyedStateBackend.java:390) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:67) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) {code} was: {code} Tests in error: RescalingITCase.testSavepointRes