[jira] [Assigned] (FLINK-4459) Introduce SlotProvider for Scheduler
[ https://issues.apache.org/jira/browse/FLINK-4459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reassigned FLINK-4459: - Assignee: Kurt Young > Introduce SlotProvider for Scheduler > > > Key: FLINK-4459 > URL: https://issues.apache.org/jira/browse/FLINK-4459 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Reporter: Till Rohrmann >Assignee: Kurt Young > > Currently the {{Scheduler}} maintains a queue of available instances which it > scans if it needs a new slot. If it finds a suitable instance (having free > slots available) it will allocate a slot from it. > This slot allocation logic can be factored out and be made available via a > {{SlotProvider}} interface. The {{SlotProvider}} has methods to allocate a > slot given a set of location preferences. Slots should be returned as > {{Futures}}, because in the future the slot allocation might happen > asynchronously (Flip-6). > In the first version, the {{SlotProvider}} implementation will simply > encapsulate the existing slot allocation logic extracted from the > {{Scheduler}}. When a slot is requested it will return a completed or failed > future since the allocation happens synchronously. > The refactoring will have the advantage to simplify the {{Scheduler}} class > and to pave the way for upcoming refactorings (Flip-6). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4478) Implement heartbeat logic
[ https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438437#comment-15438437 ] zhangjing edited comment on FLINK-4478 at 8/26/16 4:04 AM: --- Hi [~till.rohrmann], I saw your document and new branch. Good design. was (Author: jinyu.zj): Good abstraction. > Implement heartbeat logic > - > > Key: FLINK-4478 > URL: https://issues.apache.org/jira/browse/FLINK-4478 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann > Fix For: 1.2.0 > > > With the Flip-6 refactoring, we'll have the need for a dedicated heartbeat > component. The heartbeat component is used to check the liveliness of the > distributed components among each other. Furthermore, heartbeats are used to > regularly transmit status updates to another component. For example, the > TaskManager informs the ResourceManager with each heartbeat about the current > slot allocation. > The heartbeat is initiated from one component. This component sends a > heartbeat request to another component which answers with an heartbeat > response. Thus, one can differentiate between a sending and a receiving side. > Apart from the triggering of the heartbeat request, the logic of treating > heartbeats, marking components dead and payload delivery are the same and > should be reusable by different distributed components (JM, TM, RM). > Different models for the heartbeat reporting are conceivable. First of all, > the heartbeat request could be sent as an ask operation where the heartbeat > response is returned as a future on the sending side. Alternatively, the > sending side could request a heartbeat response by sending a tell message. > The heartbeat response is then delivered by an RPC back to the heartbeat > sender. The latter model has the advantage that a heartbeat response is not > tightly coupled to a heartbeat request. Such a tight coupling could cause > that heartbeat response are ignored after the future has timed out even > though they might still contain valuable information (receiver is still > alive). > Furthermore, different strategies for the heartbeat triggering and marking > heartbeat targets as dead are conceivable. For example, we could periodically > (with a fixed period) trigger a heartbeat request and mark all targets as > dead if we didn't receive a heartbeat response in a given time period. > Furthermore, we could adapt the heartbeat interval and heartbeat timeouts > with respect to the latency of previous heartbeat responses. This would > reflect the current load and network conditions better. > For the first version, I would propose to use a fixed period heartbeat with a > maximum heartbeat timeout before a target is marked dead. Furthermore, I > would propose to use tell messages (fire and forget) to request and report > heartbeats because they are the more flexible model imho. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4478) Implement heartbeat logic
[ https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438437#comment-15438437 ] zhangjing commented on FLINK-4478: -- Good abstraction. > Implement heartbeat logic > - > > Key: FLINK-4478 > URL: https://issues.apache.org/jira/browse/FLINK-4478 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.1.0 >Reporter: Till Rohrmann > Fix For: 1.2.0 > > > With the Flip-6 refactoring, we'll have the need for a dedicated heartbeat > component. The heartbeat component is used to check the liveliness of the > distributed components among each other. Furthermore, heartbeats are used to > regularly transmit status updates to another component. For example, the > TaskManager informs the ResourceManager with each heartbeat about the current > slot allocation. > The heartbeat is initiated from one component. This component sends a > heartbeat request to another component which answers with an heartbeat > response. Thus, one can differentiate between a sending and a receiving side. > Apart from the triggering of the heartbeat request, the logic of treating > heartbeats, marking components dead and payload delivery are the same and > should be reusable by different distributed components (JM, TM, RM). > Different models for the heartbeat reporting are conceivable. First of all, > the heartbeat request could be sent as an ask operation where the heartbeat > response is returned as a future on the sending side. Alternatively, the > sending side could request a heartbeat response by sending a tell message. > The heartbeat response is then delivered by an RPC back to the heartbeat > sender. The latter model has the advantage that a heartbeat response is not > tightly coupled to a heartbeat request. Such a tight coupling could cause > that heartbeat response are ignored after the future has timed out even > though they might still contain valuable information (receiver is still > alive). > Furthermore, different strategies for the heartbeat triggering and marking > heartbeat targets as dead are conceivable. For example, we could periodically > (with a fixed period) trigger a heartbeat request and mark all targets as > dead if we didn't receive a heartbeat response in a given time period. > Furthermore, we could adapt the heartbeat interval and heartbeat timeouts > with respect to the latency of previous heartbeat responses. This would > reflect the current load and network conditions better. > For the first version, I would propose to use a fixed period heartbeat with a > maximum heartbeat timeout before a target is marked dead. Furthermore, I > would propose to use tell messages (fire and forget) to request and report > heartbeats because they are the more flexible model imho. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438412#comment-15438412 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- [~StephanEwen] I thought of working on this? If you can walk through the future improvements to memory management I can take this up. Fine with [~ggevay] is also ready to volunteer in this. Let me know what you think [~StephanEwen]. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4462) Add RUN_TIME retention policy for all the flink annotations
[ https://issues.apache.org/jira/browse/FLINK-4462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438402#comment-15438402 ] ASF GitHub Bot commented on FLINK-4462: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2412 @StephanEwen I meant the library only. I did not mean it is added in code. Am not sure what are the APIs in that library to make use. > Add RUN_TIME retention policy for all the flink annotations > --- > > Key: FLINK-4462 > URL: https://issues.apache.org/jira/browse/FLINK-4462 > Project: Flink > Issue Type: Improvement >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > It is better to add RUNTIME retention policy to flink annotations. So that > utilites/tests can be added to ensure if the classes/interfaces are all > tagged with proper annotations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2412: FLINK-4462 Add RUN_TIME retention policy for all the flin...
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/2412 @StephanEwen I meant the library only. I did not mean it is added in code. Am not sure what are the APIs in that library to make use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4502) Cassandra connector documentation has misleading consistency guarantees
Elias Levy created FLINK-4502: - Summary: Cassandra connector documentation has misleading consistency guarantees Key: FLINK-4502 URL: https://issues.apache.org/jira/browse/FLINK-4502 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.1.0 Reporter: Elias Levy The Cassandra connector documentation states that "enableWriteAheadLog() is an optional method, that allows exactly-once processing for non-deterministic algorithms." This claim appears to be false. >From what I gather, the write ahead log feature of the connector works as >follows: - The sink is replaced with a stateful operator that writes incoming messages to the state backend based on checkpoint they belong in. - When the operator is notified that a Flink checkpoint has been completed it, for each set of checkpoints older than and including the committed one: * reads its messages from the state backend * writes them to Cassandra * records that it has committed them to Cassandra for the specific checkpoint and operator instance * and erases them from the state backend. This process attempts to avoid resubmitting queries to Cassandra that would otherwise occur when recovering a job from a checkpoint and having messages replayed. Alas, this does not guarantee exactly once semantics at the sink. The writes to Cassandra that occur when the operator is notified that checkpoint is completed are not atomic and they are potentially non-idempotent. If the job dies while writing to Cassandra or before committing the checkpoint via committer, queries will be replayed when the job recovers. Thus the documentation appear to be incorrect in stating this provides exactly-once semantics. There also seems to be an issue in GenericWriteAheadSink's notifyOfCompletedCheckpoint which may result in incorrect output. If sendValues returns false because a write failed, instead of bailing, it simply moves on to the next checkpoint to commit if there is one, keeping the previous one around to try again later. But that can result in newer data being overwritten with older data when the previous checkpoint is retried. Although given that CassandraCommitter implements isCheckpointCommitted as checkpointID <= this.lastCommittedCheckpointID, it actually means that when it goes back to try the uncommitted older checkpoint it will consider it committed, even though some of its data may not have been written out, and the data will be discarded. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4501) Cassandra sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy closed FLINK-4501. - Resolution: Duplicate HTTP connection got reset and was submitted twice. > Cassandra sink can lose messages > > > Key: FLINK-4501 > URL: https://issues.apache.org/jira/browse/FLINK-4501 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy > > The problem is the same as I pointed out with the Kafka producer sink > (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() > both send data asynchronously to Cassandra and record whether an error occurs > via a future callback. But CassandraSinkBase does not implement > Checkpointed, so it can't stop checkpoint from happening even though the are > Cassandra queries in flight from the checkpoint that may fail. If they do > fail, they would subsequently not be replayed when the job recovered, and > would thus be lost. > In addition, > CassandraSinkBase's close should check whether there is a pending exception > and throw it, rather than silently close. It should also wait for any > pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4500) Cassandra sink can lose messages
Elias Levy created FLINK-4500: - Summary: Cassandra sink can lose messages Key: FLINK-4500 URL: https://issues.apache.org/jira/browse/FLINK-4500 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.1.0 Reporter: Elias Levy The problem is the same as I pointed out with the Kafka producer sink (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() both send data asynchronously to Cassandra and record whether an error occurs via a future callback. But CassandraSinkBase does not implement Checkpointed, so it can't stop checkpoint from happening even though the are Cassandra queries in flight from the checkpoint that may fail. If they do fail, they would subsequently not be replayed when the job recovered, and would thus be lost. In addition, CassandraSinkBase's close should check whether there is a pending exception and throw it, rather than silently close. It should also wait for any pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4501) Cassandra sink can lose messages
Elias Levy created FLINK-4501: - Summary: Cassandra sink can lose messages Key: FLINK-4501 URL: https://issues.apache.org/jira/browse/FLINK-4501 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.1.0 Reporter: Elias Levy The problem is the same as I pointed out with the Kafka producer sink (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() both send data asynchronously to Cassandra and record whether an error occurs via a future callback. But CassandraSinkBase does not implement Checkpointed, so it can't stop checkpoint from happening even though the are Cassandra queries in flight from the checkpoint that may fail. If they do fail, they would subsequently not be replayed when the job recovered, and would thus be lost. In addition, CassandraSinkBase's close should check whether there is a pending exception and throw it, rather than silently close. It should also wait for any pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin
[ https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437818#comment-15437818 ] ASF GitHub Bot commented on FLINK-4499: --- Github user smarthi commented on the issue: https://github.com/apache/flink/pull/2422 Setup a default findbugs-exclude.xml, please feel free to customize as u deem fit for Flink. > Introduce findbugs maven plugin > --- > > Key: FLINK-4499 > URL: https://issues.apache.org/jira/browse/FLINK-4499 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > As suggested by Stephan in FLINK-4482, this issue is to add > findbugs-maven-plugin into the build process so that we can detect lack of > proper locking and other defects automatically. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin
[ https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437815#comment-15437815 ] ASF GitHub Bot commented on FLINK-4499: --- GitHub user smarthi opened a pull request: https://github.com/apache/flink/pull/2422 FLINK-4499: Introduce findbugs maven plugin Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/flink FLINK-4499 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2422 > Introduce findbugs maven plugin > --- > > Key: FLINK-4499 > URL: https://issues.apache.org/jira/browse/FLINK-4499 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > As suggested by Stephan in FLINK-4482, this issue is to add > findbugs-maven-plugin into the build process so that we can detect lack of > proper locking and other defects automatically. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2422: FLINK-4499: Introduce findbugs maven plugin
Github user smarthi commented on the issue: https://github.com/apache/flink/pull/2422 Setup a default findbugs-exclude.xml, please feel free to customize as u deem fit for Flink. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2422: FLINK-4499: Introduce findbugs maven plugin
GitHub user smarthi opened a pull request: https://github.com/apache/flink/pull/2422 FLINK-4499: Introduce findbugs maven plugin Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/flink FLINK-4499 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2422 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437693#comment-15437693 ] ASF GitHub Bot commented on FLINK-3950: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2374 @zentol I've updated Meter's interface. Could you give this PR another review? Sorry for the force push, I had to rebase on top of `master` to resolve merge conflict. > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2374: [FLINK-3950] Add Meter Metric Type
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2374 @zentol I've updated Meter's interface. Could you give this PR another review? Sorry for the force push, I had to rebase on top of `master` to resolve merge conflict. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4440) Make API for edge/vertex creation less verbose
[ https://issues.apache.org/jira/browse/FLINK-4440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437533#comment-15437533 ] Greg Hogan commented on FLINK-4440: --- I think it is good to be explicit when assigning the vertex and edge values. By adding a second (and third?) means of creating a vertex simply adds to a user's cognitive load. The cost is small but I think the benefit is very small. More importantly, we should strive to solve the underlying problem of requiring one but only one value. > Make API for edge/vertex creation less verbose > -- > > Key: FLINK-4440 > URL: https://issues.apache.org/jira/browse/FLINK-4440 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk >Priority: Trivial > > It would be better if one could create vertex/edges like this: > {code:java} > Vertexv = Vertex.create(42); > Edge e = Edge.create(5, 6); > {code} > Instead of this: > {code:java} > Vertex v = new Vertex (42, > NullValue.getInstance()); > Edge e = new Edge NullValue>(5, 6, NullValue.getInstance()); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437475#comment-15437475 ] ASF GitHub Bot commented on FLINK-2055: --- Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi, I have updated the PR based on @ramkrish86 's comments. 1. rename MutationActionList to MutationActions which has a public method createMutations that return HBase Mutations 2. make MutationAction a private inner class 3. DeleteColumn action can now delete the latest version of the specified column 4. check if rowkey is null when create Mutations 5. hand over the combinations of Delete logic to HBase server And I checked if table doesn't exist in HBase in which case a table not found exception will be thrown > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user delding commented on the issue: https://github.com/apache/flink/pull/2332 Hi, I have updated the PR based on @ramkrish86 's comments. 1. rename MutationActionList to MutationActions which has a public method createMutations that return HBase Mutations 2. make MutationAction a private inner class 3. DeleteColumn action can now delete the latest version of the specified column 4. check if rowkey is null when create Mutations 5. hand over the combinations of Delete logic to HBase server And I checked if table doesn't exist in HBase in which case a table not found exception will be thrown --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4459) Introduce SlotProvider for Scheduler
[ https://issues.apache.org/jira/browse/FLINK-4459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437459#comment-15437459 ] Stephan Ewen commented on FLINK-4459: - Yes, this is needed. In the master branch, the current {{Scheduler}} should implement the {{SlotProvider}} interface. It would basically only hand out futures or slots, and then the {{ExecutionGraph}} code could check that the futures are always already completed. In the FLIP-6 branch, we can adapt the scheduler to a different {{SlotProvider}}. I think it makes sense there as well to keep the logic about {{SlotSharingGroup}} and {{CoLocationConstraint}} out of the {{SlotPool}} and in a separate class (called {{Slotprovider}} or {{Scheduler}} or whatever. > Introduce SlotProvider for Scheduler > > > Key: FLINK-4459 > URL: https://issues.apache.org/jira/browse/FLINK-4459 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Reporter: Till Rohrmann > > Currently the {{Scheduler}} maintains a queue of available instances which it > scans if it needs a new slot. If it finds a suitable instance (having free > slots available) it will allocate a slot from it. > This slot allocation logic can be factored out and be made available via a > {{SlotProvider}} interface. The {{SlotProvider}} has methods to allocate a > slot given a set of location preferences. Slots should be returned as > {{Futures}}, because in the future the slot allocation might happen > asynchronously (Flip-6). > In the first version, the {{SlotProvider}} implementation will simply > encapsulate the existing slot allocation logic extracted from the > {{Scheduler}}. When a slot is requested it will return a completed or failed > future since the allocation happens synchronously. > The refactoring will have the advantage to simplify the {{Scheduler}} class > and to pave the way for upcoming refactorings (Flip-6). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3679: -- Component/s: Kafka Connector > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4490) Decouple Slot and Instance
[ https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437453#comment-15437453 ] Stephan Ewen commented on FLINK-4490: - For the FLIP-6 work, we need that agreed. Here is an idea of how we can make this hopefully without a lot of additional work: (1) We introduce an abstract class {{Slot}} class that defines the methods that the {{ExecutionGraph}} and {{JobManager}} need, like - fail() - release() - return() - getTaskManagerGateway() The {{SlotProvider}} will expose the new {{Slot}}. (2) We implement a version of that slot that internally contains the current master's slot, to keep this compatible in the master, and we change the ExecutionGraph to work on the new {{Slot}}. The scheduler wraps the old slot to the new slot. (3) We can then implement the FLIP-6 specific variant of the new slot. > Decouple Slot and Instance > -- > > Key: FLINK-4490 > URL: https://issues.apache.org/jira/browse/FLINK-4490 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Reporter: Kurt Young > > Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} > holding {{Slot}}, it makes sense because it reflects how many resources it > can provide and how many are using. > But it's not very necessary for {{Slot}} to hold {{Instance}} which it > belongs to. It only needs to hold some connection information and gateway to > talk to. Another downside for {{Slot}} holding {{Instance}} is that > {{Instance}} actually contains some allocate/de-allocation logicals, it will > be difficult if we want to do some allocation refactor without letting > {{Slot}} noticed. > We should abstract the connection information of {{Instance}} to let {{Slot}} > holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of > instance's akka gateway, maybe we can just adding the akka gateway to the > {{InstanceConnectionInfo}}) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437410#comment-15437410 ] Gabor Gevay commented on FLINK-3322: Yes, I volunteer to help :) > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4278) Unclosed FSDataOutputStream in multiple files in the project
[ https://issues.apache.org/jira/browse/FLINK-4278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437396#comment-15437396 ] ASF GitHub Bot commented on FLINK-4278: --- Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2403 @StephanEwen, let me close this and open a new one after some testing. Will post an new PR this week. > Unclosed FSDataOutputStream in multiple files in the project > > > Key: FLINK-4278 > URL: https://issues.apache.org/jira/browse/FLINK-4278 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Neelesh Srinivas Salian >Assignee: Neelesh Srinivas Salian > > After FLINK-4259, I did a check and found that the following files don't have > the closing of the FSDataOutputStream. > The following are the files and the corresponding methods missing the close() > 1) FSDataOutputStream.java adding a close method in the abstract class > 2) FSStateBackend flush() and write() - closing the FSDataOutputStream > 3) StringWriter.java write() > 4) FileSystemStateStore putState() - closing the FSDataOutputStream > 5) HadoopDataOutputStream.java not too sure if this needs closing. > 6) FileSystemStateStorageHelper.java store() need closing for both outStream > and the ObjectOutputStream > The options to think would be to either close or use IOUtils.closeQuietly() > Any thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2403: [FLINK-4278]: Unclosed FSDataOutputStream in multi...
Github user nssalian closed the pull request at: https://github.com/apache/flink/pull/2403 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4278) Unclosed FSDataOutputStream in multiple files in the project
[ https://issues.apache.org/jira/browse/FLINK-4278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437395#comment-15437395 ] ASF GitHub Bot commented on FLINK-4278: --- Github user nssalian closed the pull request at: https://github.com/apache/flink/pull/2403 > Unclosed FSDataOutputStream in multiple files in the project > > > Key: FLINK-4278 > URL: https://issues.apache.org/jira/browse/FLINK-4278 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Neelesh Srinivas Salian >Assignee: Neelesh Srinivas Salian > > After FLINK-4259, I did a check and found that the following files don't have > the closing of the FSDataOutputStream. > The following are the files and the corresponding methods missing the close() > 1) FSDataOutputStream.java adding a close method in the abstract class > 2) FSStateBackend flush() and write() - closing the FSDataOutputStream > 3) StringWriter.java write() > 4) FileSystemStateStore putState() - closing the FSDataOutputStream > 5) HadoopDataOutputStream.java not too sure if this needs closing. > 6) FileSystemStateStorageHelper.java store() need closing for both outStream > and the ObjectOutputStream > The options to think would be to either close or use IOUtils.closeQuietly() > Any thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2403: [FLINK-4278]: Unclosed FSDataOutputStream in multiple fil...
Github user nssalian commented on the issue: https://github.com/apache/flink/pull/2403 @StephanEwen, let me close this and open a new one after some testing. Will post an new PR this week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4100) RocksDBStateBackend#close() can throw NPE
[ https://issues.apache.org/jira/browse/FLINK-4100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437392#comment-15437392 ] Neelesh Srinivas Salian commented on FLINK-4100: Sounds good. Thanks [~aljoscha] > RocksDBStateBackend#close() can throw NPE > - > > Key: FLINK-4100 > URL: https://issues.apache.org/jira/browse/FLINK-4100 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Tests >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Priority: Trivial > > When running the RocksDBStateBackendTest on Windows i ran into an NPE. The > tests are aborted in the @Before checkOperatingSystem method (which is > correct behaviour), but the test still calls dispose() in @After teardown(). > This lead to an NPE since the lock object used is null; it was not > initialized since initializeForJob() was never called and there is no null > check. > {code} > testCopyDefaultValue(org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest) > Time elapsed: 0 sec <<< ERROR! > java.lang.NullPointerException: null > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.dispose(RocksDBStateBackend.java:318) > at > org.apache.flink.runtime.state.StateBackendTestBase.teardown(StateBackendTestBase.java:71) > at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4245) Metric naming improvements
[ https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437391#comment-15437391 ] ASF GitHub Bot commented on FLINK-4245: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2418 Looks like we could just pop the variables map into constructor and get the key names for free, that's neat. I wouldn't worry too much about disregarding the naming scheme; in contrast to other reporters additional keys (as i understand it) don't make it more difficult to work with a given metric. What we could do is add a configuration option `exposeAllKeys` or something that if set pops the map into the constructor, otherwise it uses the configured naming scheme. > Metric naming improvements > -- > > Key: FLINK-4245 > URL: https://issues.apache.org/jira/browse/FLINK-4245 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Stephan Ewen > > A metric currently has two parts to it: > - The name of that particular metric > - The "scope" (or namespace), defined by the group that contains the metric. > A metric group actually always implicitly has a map of naming "tags", like: > - taskmanager_host : > - taskmanager_id : > - task_name : "map() -> filter()" > We derive the scope from that map, following the defined scope formats. > For JMX (and some users that use JMX), it would be natural to expose that map > of tags. Some users reconstruct that map by parsing the metric scope. JMX, we > can expose a metric like: > - domain: "taskmanager.task.operator.io" > - name: "numRecordsIn" > - tags: { "hostname" -> "localhost", "operator_name" -> "map() at > X.java:123", ... } > For many other reporters, the formatted scope makes a lot of sense, since > they think only in terms of (scope, metric-name). > We may even have the formatted scope in JMX as well (in the domain), if we > want to go that route. > [~jgrier] and [~Zentol] - what do you think about that? > [~mdaxini] Does that match your use of the metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2418: [FLINK-4245] JMXReporter exposes all defined variables
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2418 Looks like we could just pop the variables map into constructor and get the key names for free, that's neat. I wouldn't worry too much about disregarding the naming scheme; in contrast to other reporters additional keys (as i understand it) don't make it more difficult to work with a given metric. What we could do is add a configuration option `exposeAllKeys` or something that if set pops the map into the constructor, otherwise it uses the configured naming scheme. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4482) numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock
[ https://issues.apache.org/jira/browse/FLINK-4482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437383#comment-15437383 ] ASF GitHub Bot commented on FLINK-4482: --- GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/2421 FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2421.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2421 commit 2021f4790715ecb762dbb23438bf0b2b2755845e Author: tedyuDate: 2016-08-25T18:11:44Z FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock > numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock > -- > > Key: FLINK-4482 > URL: https://issues.apache.org/jira/browse/FLINK-4482 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > In CheckpointCoordinator#stopCheckpointScheduler() : > {code} > synchronized (lock) { > ... > numUnsuccessfulCheckpointsTriggers = 0; > {code} > triggerLock is not held in the above operation. > See comment for triggerLock earlier in triggerCheckpoint(): > {code} > // we lock with a special lock to make sure that trigger requests do not > overtake each other. > // this is not done with the coordinator-wide lock, because the > 'checkpointIdCounter' > // may issue blocking operations. Using a different lock than teh > coordinator-wide lock, > // we avoid blocking the processing of 'acknowledge/decline' messages > during that time. > synchronized (triggerLock) { > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2421: FLINK-4482 numUnsuccessfulCheckpointsTriggers is a...
GitHub user tedyu opened a pull request: https://github.com/apache/flink/pull/2421 FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2421.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2421 commit 2021f4790715ecb762dbb23438bf0b2b2755845e Author: tedyuDate: 2016-08-25T18:11:44Z FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437322#comment-15437322 ] ASF GitHub Bot commented on FLINK-3874: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2244 Hey @fhueske I've renamed tests as you suggested. > Add a Kafka TableSink with JSON serialization > - > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2244 Hey @fhueske I've renamed tests as you suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437314#comment-15437314 ] Stephan Ewen commented on FLINK-3322: - Would you volunteer to help with this? If yes, I'd be happy to walk you through some approach that would be well aligned with thoughts for future improvements to memory management. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts
[ https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437310#comment-15437310 ] Stephan Ewen commented on FLINK-3030: - Thanks, that is a great issue to solve! > Enhance Dashboard to show Execution Attempts > > > Key: FLINK-3030 > URL: https://issues.apache.org/jira/browse/FLINK-3030 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > Fix For: 1.0.0 > > > Currently, the web dashboard shows only the latest execution attempt. We > should make all execution attempts and their accumulators available for > inspection. > The REST monitoring API supports this, so it should be a change only to the > frontend part. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4440) Make API for edge/vertex creation less verbose
[ https://issues.apache.org/jira/browse/FLINK-4440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437307#comment-15437307 ] Vasia Kalavri commented on FLINK-4440: -- I don't have a strong opinion against adding the {{create}} methods. Why do you think there is a cost to adding them [~greghogan]? > Make API for edge/vertex creation less verbose > -- > > Key: FLINK-4440 > URL: https://issues.apache.org/jira/browse/FLINK-4440 > Project: Flink > Issue Type: Improvement > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk >Priority: Trivial > > It would be better if one could create vertex/edges like this: > {code:java} > Vertexv = Vertex.create(42); > Edge e = Edge.create(5, 6); > {code} > Instead of this: > {code:java} > Vertex v = new Vertex (42, > NullValue.getInstance()); > Edge e = new Edge NullValue>(5, 6, NullValue.getInstance()); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4499) Introduce findbugs maven plugin
Ted Yu created FLINK-4499: - Summary: Introduce findbugs maven plugin Key: FLINK-4499 URL: https://issues.apache.org/jira/browse/FLINK-4499 Project: Flink Issue Type: Improvement Reporter: Ted Yu As suggested by Stephan in FLINK-4482, this issue is to add findbugs-maven-plugin into the build process so that we can detect lack of proper locking and other defects automatically. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2002: Support for bz2 compression in flink-core
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2002#discussion_r76288290 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java --- @@ -23,13 +23,12 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collection; -import java.util.zip.InflaterInputStream; /** * Creates a new instance of a certain subclass of {@link java.util.zip.InflaterInputStream}. */ @Internal -public interface InflaterInputStreamFactory { +public interface InflaterInputStreamFactory { --- End diff -- I think Greg raised a good point, the classname and comments should be adjusted as well. How about `DecompressingStreamFactory`, or `DecoratorStreamFactory` (if you want to keep it generic)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2294: [FLINK-4265] [dataset api] Add a NoOpOperator
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2294 This is okay to merge from my side. Definitely seems easier than the `Delegate` trick. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4265) Add a NoOpOperator
[ https://issues.apache.org/jira/browse/FLINK-4265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437279#comment-15437279 ] ASF GitHub Bot commented on FLINK-4265: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2294 This is okay to merge from my side. Definitely seems easier than the `Delegate` trick. > Add a NoOpOperator > -- > > Key: FLINK-4265 > URL: https://issues.apache.org/jira/browse/FLINK-4265 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > One recent feature of Gelly is algorithms which detect duplicated or similar > computation which can be shared. My initial implementation could only reuse a > {{DataSet}} result. Before committing to Flink this was updated to use a > javassist {{ProxyFactory}} allowing configuration to be merged and results to > be replaced. There were some issues, as identified in FLINK-4257. With a > {{NoOpOperator}} we can remove the use of {{ProxyFactory}} and resolve the > identified issues. > This ticket adds a {{NoOpOperator}} which is unwound in > {{OperatorTranslation.translate}}. The {{NoOpOperator}} contains a > {{DataSet}} which is accessed by a getter and setter. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4482) numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock
[ https://issues.apache.org/jira/browse/FLINK-4482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437274#comment-15437274 ] Stephan Ewen commented on FLINK-4482: - I think the statements can be adjusted to be properly locked. How about adding some {{GuardedBy}} annotations and adding the maven "findbugs" plugin to check such situations automatically? > numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock > -- > > Key: FLINK-4482 > URL: https://issues.apache.org/jira/browse/FLINK-4482 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > In CheckpointCoordinator#stopCheckpointScheduler() : > {code} > synchronized (lock) { > ... > numUnsuccessfulCheckpointsTriggers = 0; > {code} > triggerLock is not held in the above operation. > See comment for triggerLock earlier in triggerCheckpoint(): > {code} > // we lock with a special lock to make sure that trigger requests do not > overtake each other. > // this is not done with the coordinator-wide lock, because the > 'checkpointIdCounter' > // may issue blocking operations. Using a different lock than teh > coordinator-wide lock, > // we avoid blocking the processing of 'acknowledge/decline' messages > during that time. > synchronized (triggerLock) { > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437243#comment-15437243 ] ASF GitHub Bot commented on FLINK-3874: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2244 Thank you for helping with this PR @fhueske It's ok, I'll update the PR in an hour or so. > Add a Kafka TableSink with JSON serialization > - > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2244 Thank you for helping with this PR @fhueske It's ok, I'll update the PR in an hour or so. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437238#comment-15437238 ] ASF GitHub Bot commented on FLINK-4437: --- Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/2409 > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > Fix For: 1.2.0 > > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2409: FLINK-4437 Lock evasion around lastTriggeredCheckp...
Github user tedyu closed the pull request at: https://github.com/apache/flink/pull/2409 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2420: [hotfix] Fix invalid url link in documentation
Github user haoch commented on the issue: https://github.com/apache/flink/pull/2420 @fhueske sure and thanks for the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2420: [hotfix] Fix invalid url link in documentation
Github user haoch closed the pull request at: https://github.com/apache/flink/pull/2420 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2369 Thanks Robert for addressing my comments :) Overall, I like the new hybrid producer approach. However, I'm still curious whether or not it is possible / reasonable to drop the `FlinkKafkaProducer010Configuration` return type of invocation (b), and let both invocation methods return `FlinkKafkaProducer010` instead. So, ``` FlinkKafkaProducer010 kafka = new FlinkKafkaProducer010(...) // or FlinkKafkaProducer010 kafka = FlinkKafkaProducer010.writeToKafkaWithTimestamps(...) for timestamp support // setter config methods directly done on the FlinkKafkaProducer010 instance regardless of (a) or (b) kafka.setLogFailuresOnly(true) kafka.setFlushOnCheckpoint(true) kafka.setWriteTimestampToKafka(true) // would not have effect if original invocation method (a) was used ``` But we'll need to be bit hacky in `invokeInternal(element, elementTimestamp)`, something like only letting the given `timestamp` to `ProducerRecord` be non-null if `writeTimestampToKafka && elementTimestamp != Long.MIN_VALUE`. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437224#comment-15437224 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2369 Thanks Robert for addressing my comments :) Overall, I like the new hybrid producer approach. However, I'm still curious whether or not it is possible / reasonable to drop the `FlinkKafkaProducer010Configuration` return type of invocation (b), and let both invocation methods return `FlinkKafkaProducer010` instead. So, ``` FlinkKafkaProducer010 kafka = new FlinkKafkaProducer010(...) // or FlinkKafkaProducer010 kafka = FlinkKafkaProducer010.writeToKafkaWithTimestamps(...) for timestamp support // setter config methods directly done on the FlinkKafkaProducer010 instance regardless of (a) or (b) kafka.setLogFailuresOnly(true) kafka.setFlushOnCheckpoint(true) kafka.setWriteTimestampToKafka(true) // would not have effect if original invocation method (a) was used ``` But we'll need to be bit hacky in `invokeInternal(element, elementTimestamp)`, something like only letting the given `timestamp` to `ProducerRecord` be non-null if `writeTimestampToKafka && elementTimestamp != Long.MIN_VALUE`. What do you think? > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437223#comment-15437223 ] ASF GitHub Bot commented on FLINK-4437: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2409 @tedyu Should we close this pull request? > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > Fix For: 1.2.0 > > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2409 @tedyu Should we close this pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2420: [hotfix] Fix invalid url link in documentation
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2420 Thanks for the fix @haoch! However, @smarthi opened #2416 to fix the issue as well. Would you mind closing your PR? There are certainly more things the documentation that can be improved. Please have a look into JIRA or proof-read some pages if you would like to contribute. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4498) Better Cassandra sink documentation
[ https://issues.apache.org/jira/browse/FLINK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-4498: -- Description: The Cassandra sink documentation is somewhat muddled and could be improved. For instance, the fact that is only supports tuples and POJO's that use DataStax Mapper annotations is only mentioned in passing, and it is not clear that the reference to tuples only applies to Flink Java tuples and not Scala tuples. The documentation also does not mention that setQuery() is only necessary for tuple streams. The explanation of the write ahead log could use some cleaning up to clarify when it is appropriate to use, ideally with an example. Maybe this would be best as a blog post to expand on the type of non-deterministic streams this applies to. It would also be useful to mention that tuple elements will be mapped to Cassandra columns using the Datastax Java driver's default encoders, which are somewhat limited (e.g. to write to a blob column the type in the tuple must be a java.nio.ByteBuffer and not just a byte[]). was: The Cassandra sink documentation is somewhat muddled and could be improved. For instance, the fact that is only supports tuples and POJO's that use DataStax Mapper annotations is only mentioned in passing, and it is not clear that the reference to tuples only applies to Flink Java tuples and not Scala tuples. The documentation also does not mention that setQuery() is only necessary for tuple streams. It would be good to have an example of a POJO stream with the DataStax annotations. The explanation of the write ahead log could use some cleaning up to clarify when it is appropriate to use, ideally with an example. Maybe this would be best as a blog post to expand on the type of non-deterministic streams this applies to. It would also be useful to mention that tuple elements will be mapped to Cassandra columns using the Datastax Java driver's default encoders, which are somewhat limited (e.g. to write to a blob column the type in the tuple must be a java.nio.ByteBuffer and not just a byte[]). > Better Cassandra sink documentation > --- > > Key: FLINK-4498 > URL: https://issues.apache.org/jira/browse/FLINK-4498 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.1.0 >Reporter: Elias Levy > > The Cassandra sink documentation is somewhat muddled and could be improved. > For instance, the fact that is only supports tuples and POJO's that use > DataStax Mapper annotations is only mentioned in passing, and it is not clear > that the reference to tuples only applies to Flink Java tuples and not Scala > tuples. > The documentation also does not mention that setQuery() is only necessary for > tuple streams. > The explanation of the write ahead log could use some cleaning up to clarify > when it is appropriate to use, ideally with an example. Maybe this would be > best as a blog post to expand on the type of non-deterministic streams this > applies to. > It would also be useful to mention that tuple elements will be mapped to > Cassandra columns using the Datastax Java driver's default encoders, which > are somewhat limited (e.g. to write to a blob column the type in the tuple > must be a java.nio.ByteBuffer and not just a byte[]). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4498) Better Cassandra sink documentation
Elias Levy created FLINK-4498: - Summary: Better Cassandra sink documentation Key: FLINK-4498 URL: https://issues.apache.org/jira/browse/FLINK-4498 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.0 Reporter: Elias Levy The Cassandra sink documentation is somewhat muddled and could be improved. For instance, the fact that is only supports tuples and POJO's that use DataStax Mapper annotations is only mentioned in passing, and it is not clear that the reference to tuples only applies to Flink Java tuples and not Scala tuples. The documentation also does not mention that setQuery() is only necessary for tuple streams. It would be good to have an example of a POJO stream with the DataStax annotations. The explanation of the write ahead log could use some cleaning up to clarify when it is appropriate to use, ideally with an example. Maybe this would be best as a blog post to expand on the type of non-deterministic streams this applies to. It would also be useful to mention that tuple elements will be mapped to Cassandra columns using the Datastax Java driver's default encoders, which are somewhat limited (e.g. to write to a blob column the type in the tuple must be a java.nio.ByteBuffer and not just a byte[]). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2418: [FLINK-4245] JMXReporter exposes all defined variables
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2418 I think that works to expose the names. I was wondering, though, if we can have a more JMX-natural integration. The JMX `ObjectName` has the constructor `ObjectName(String domain, Hashtabletable)`. Rather than having `key1=taskmanager,key2=,key3=`, we could go for `domain=taskmanager.task.operator.io.numRecords` (no variable parsing) and `table={taskmanager=, hostname=, jobid=, ...`. If I got the thoughts of some of the users right, that would be the preferred way for them. That probably contradicts the "configurable name" design we have for the other reporters, because the table and the formatted scope/name override each other (I think, not sure, not a JMX expert here). In the end, both approaches are probably valid and which one is more usable depends on the metric infrastructure of the users. So, if we cannot get both into the same reporter, should we simply have two JMX reporters? A "tag-based" (like discussed above) and a "name/scope-based" (like we have so far)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4245) Metric naming improvements
[ https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437183#comment-15437183 ] ASF GitHub Bot commented on FLINK-4245: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2418 I think that works to expose the names. I was wondering, though, if we can have a more JMX-natural integration. The JMX `ObjectName` has the constructor `ObjectName(String domain, Hashtabletable)`. Rather than having `key1=taskmanager,key2=,key3=`, we could go for `domain=taskmanager.task.operator.io.numRecords` (no variable parsing) and `table={taskmanager=, hostname=, jobid=, ...`. If I got the thoughts of some of the users right, that would be the preferred way for them. That probably contradicts the "configurable name" design we have for the other reporters, because the table and the formatted scope/name override each other (I think, not sure, not a JMX expert here). In the end, both approaches are probably valid and which one is more usable depends on the metric infrastructure of the users. So, if we cannot get both into the same reporter, should we simply have two JMX reporters? A "tag-based" (like discussed above) and a "name/scope-based" (like we have so far)? > Metric naming improvements > -- > > Key: FLINK-4245 > URL: https://issues.apache.org/jira/browse/FLINK-4245 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Stephan Ewen > > A metric currently has two parts to it: > - The name of that particular metric > - The "scope" (or namespace), defined by the group that contains the metric. > A metric group actually always implicitly has a map of naming "tags", like: > - taskmanager_host : > - taskmanager_id : > - task_name : "map() -> filter()" > We derive the scope from that map, following the defined scope formats. > For JMX (and some users that use JMX), it would be natural to expose that map > of tags. Some users reconstruct that map by parsing the metric scope. JMX, we > can expose a metric like: > - domain: "taskmanager.task.operator.io" > - name: "numRecordsIn" > - tags: { "hostname" -> "localhost", "operator_name" -> "map() at > X.java:123", ... } > For many other reporters, the formatted scope makes a lot of sense, since > they think only in terms of (scope, metric-name). > We may even have the formatted scope in JMX as well (in the domain), if we > want to go that route. > [~jgrier] and [~Zentol] - what do you think about that? > [~mdaxini] Does that match your use of the metrics? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2420: [hotfix] Fix invalid elasticsearch url link in doc...
GitHub user haoch opened a pull request: https://github.com/apache/flink/pull/2420 [hotfix] Fix invalid elasticsearch url link in documentation Fix invalid elasticsearch url link from `http://elastic.com` to `http://elastic.co` in documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/flink patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2420.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2420 commit 7b4275508940bedc95a1a281530ab7b5848b477e Author: Hao ChenDate: 2016-08-25T16:39:00Z [hotfix] Fix invalid elasticsearch url link Fix invalid elasticsearch url link from `http://elastic.com` to `http://elastic.co` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website
[ https://issues.apache.org/jira/browse/FLINK-4479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437181#comment-15437181 ] Robert Metzger commented on FLINK-4479: --- PR open: https://github.com/apache/flink-web/pull/32 > Replace trademark (tm) with registered trademark (R) sign on Flink website > -- > > Key: FLINK-4479 > URL: https://issues.apache.org/jira/browse/FLINK-4479 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Robert Metzger >Assignee: Robert Metzger > > Flink is now a registered trademark, so we should reflect that on our website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2244 Looks good to merge @mushketyk :-) The only thing that I'd change would be to rename `Kafka08JsonTableSinkTest` to `Kafka08JsonTableSinkITCase` (same for Kafka09*), since these are long running integration tests which should be executed in the appropriate Maven phase (verify). I can do that as well before merging. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437179#comment-15437179 ] ASF GitHub Bot commented on FLINK-3874: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2244 Looks good to merge @mushketyk :-) The only thing that I'd change would be to rename `Kafka08JsonTableSinkTest` to `Kafka08JsonTableSinkITCase` (same for Kafka09*), since these are long running integration tests which should be executed in the appropriate Maven phase (verify). I can do that as well before merging. Thanks, Fabian > Add a Kafka TableSink with JSON serialization > - > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76277062 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java --- @@ -135,45 +135,45 @@ public void testPeriodicWatermarks() throws Exception { // elements generate a watermark if the timestamp is a multiple of three // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 1L)); - fetcher.emitRecord(2L, part1, 2L, new ConsumerRecord (testTopic, 7, new byte[]{0}, 2L)); - fetcher.emitRecord(3L, part1, 3L, new ConsumerRecord (testTopic, 7, new byte[]{0}, 3L)); + fetcher.emitRecord(1L, part1, 1L, Long.MAX_VALUE); + fetcher.emitRecord(2L, part1, 2L, Long.MAX_VALUE); + fetcher.emitRecord(3L, part1, 3L, Long.MAX_VALUE); assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L, new ConsumerRecord (testTopic, 13, new byte[]{0}, 1L)); + fetcher.emitRecord(12L, part2, 1L, Long.MAX_VALUE); assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 1L)); - fetcher.emitRecord(102L, part3, 2L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 2L)); + fetcher.emitRecord(101L, part3, 1L, Long.MAX_VALUE); + fetcher.emitRecord(102L, part3, 2L, Long.MAX_VALUE); assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 3L)); - fetcher.emitRecord(1004L, part3, 4L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 4L)); - fetcher.emitRecord(1005L, part3, 5L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 5L)); + fetcher.emitRecord(1003L, part3, 3L, Long.MAX_VALUE); + fetcher.emitRecord(1004L, part3, 4L, Long.MAX_VALUE); + fetcher.emitRecord(1005L, part3, 5L, Long.MAX_VALUE); assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L, new ConsumerRecord (testTopic, 7, new byte[]{0}, 4L)); + fetcher.emitRecord(30L, part1, 4L, Long.MAX_VALUE); assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); // this blocks until the periodic thread emitted the watermark assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L, new ConsumerRecord (testTopic, 13, new byte[]{0}, 2L)); - fetcher.emitRecord(14L, part2, 3L, new ConsumerRecord (testTopic, 13, new byte[]{0}, 3L)); - fetcher.emitRecord(15L, part2, 3L, new ConsumerRecord (testTopic, 13, new byte[]{0}, 3L)); + fetcher.emitRecord(13L, part2, 2L, Long.MAX_VALUE); + fetcher.emitRecord(14L, part2, 3L, Long.MAX_VALUE); + fetcher.emitRecord(15L, part2, 3L, Long.MAX_VALUE); --- End diff -- Same here: I think these are supposed to give a `Long.MIN_VALUE` instead of MAX? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437170#comment-15437170 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76277062 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java --- @@ -135,45 +135,45 @@ public void testPeriodicWatermarks() throws Exception { // elements generate a watermark if the timestamp is a multiple of three // elements for partition 1 - fetcher.emitRecord(1L, part1, 1L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 1L)); - fetcher.emitRecord(2L, part1, 2L, new ConsumerRecord (testTopic, 7, new byte[]{0}, 2L)); - fetcher.emitRecord(3L, part1, 3L, new ConsumerRecord (testTopic, 7, new byte[]{0}, 3L)); + fetcher.emitRecord(1L, part1, 1L, Long.MAX_VALUE); + fetcher.emitRecord(2L, part1, 2L, Long.MAX_VALUE); + fetcher.emitRecord(3L, part1, 3L, Long.MAX_VALUE); assertEquals(3L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(3L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 2 - fetcher.emitRecord(12L, part2, 1L, new ConsumerRecord (testTopic, 13, new byte[]{0}, 1L)); + fetcher.emitRecord(12L, part2, 1L, Long.MAX_VALUE); assertEquals(12L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(12L, sourceContext.getLatestElement().getTimestamp()); // elements for partition 3 - fetcher.emitRecord(101L, part3, 1L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 1L)); - fetcher.emitRecord(102L, part3, 2L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 2L)); + fetcher.emitRecord(101L, part3, 1L, Long.MAX_VALUE); + fetcher.emitRecord(102L, part3, 2L, Long.MAX_VALUE); assertEquals(102L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(102L, sourceContext.getLatestElement().getTimestamp()); // now, we should have a watermark (this blocks until the periodic thread emitted the watermark) assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 3 - fetcher.emitRecord(1003L, part3, 3L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 3L)); - fetcher.emitRecord(1004L, part3, 4L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 4L)); - fetcher.emitRecord(1005L, part3, 5L, new ConsumerRecord (testTopic, 21, new byte[]{0}, 5L)); + fetcher.emitRecord(1003L, part3, 3L, Long.MAX_VALUE); + fetcher.emitRecord(1004L, part3, 4L, Long.MAX_VALUE); + fetcher.emitRecord(1005L, part3, 5L, Long.MAX_VALUE); assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(1005L, sourceContext.getLatestElement().getTimestamp()); // advance partition 1 beyond partition 2 - this bumps the watermark - fetcher.emitRecord(30L, part1, 4L, new ConsumerRecord (testTopic, 7, new byte[]{0}, 4L)); + fetcher.emitRecord(30L, part1, 4L, Long.MAX_VALUE); assertEquals(30L, sourceContext.getLatestElement().getValue().longValue()); assertEquals(30L, sourceContext.getLatestElement().getTimestamp()); // this blocks until the periodic thread emitted the watermark assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp()); // advance partition 2 again - this bumps the watermark - fetcher.emitRecord(13L, part2, 2L, new ConsumerRecord (testTopic, 13, new byte[]{0}, 2L)); - fetcher.emitRecord(14L, part2, 3L, new ConsumerRecord (testTopic, 13, new byte[]{0}, 3L)); - fetcher.emitRecord(15L, part2, 3L, new ConsumerRecord (testTopic, 13, new byte[]{0}, 3L)); + fetcher.emitRecord(13L, part2, 2L, Long.MAX_VALUE); + fetcher.emitRecord(14L, part2, 3L, Long.MAX_VALUE); + fetcher.emitRecord(15L, part2, 3L, Long.MAX_VALUE); --- End diff -- Same here: I think these are supposed to give a `Long.MIN_VALUE` instead of MAX? > Bump Kafka producer in
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437160#comment-15437160 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76275879 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -262,6 +262,10 @@ public void run() { } } + // Kafka09Fetcher ignores the timestamp. + protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception { + emitRecord(record, partition, offset, Long.MAX_VALUE); --- End diff -- Same here: I think this is supposed to give a `Long.MIN_VALUE` instead of MAX? > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437159#comment-15437159 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76275805 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -376,7 +376,7 @@ else if (partitionsRemoved) { continue partitionsLoop; } - owner.emitRecord(value, currentPartition, offset, msg); + owner.emitRecord(value, currentPartition, offset, Long.MAX_VALUE); --- End diff -- I think this is supposed to give a `Long.MIN_VALUE` instead of MAX? > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76275879 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -262,6 +262,10 @@ public void run() { } } + // Kafka09Fetcher ignores the timestamp. + protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception { + emitRecord(record, partition, offset, Long.MAX_VALUE); --- End diff -- Same here: I think this is supposed to give a `Long.MIN_VALUE` instead of MAX? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76275805 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -376,7 +376,7 @@ else if (partitionsRemoved) { continue partitionsLoop; } - owner.emitRecord(value, currentPartition, offset, msg); + owner.emitRecord(value, currentPartition, offset, Long.MAX_VALUE); --- End diff -- I think this is supposed to give a `Long.MIN_VALUE` instead of MAX? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
Elias Levy created FLINK-4497: - Summary: Add support for Scala tuples and case classes to Cassandra sink Key: FLINK-4497 URL: https://issues.apache.org/jira/browse/FLINK-4497 Project: Flink Issue Type: Improvement Components: Cassandra Connector Affects Versions: 1.1.0 Reporter: Elias Levy The new Cassandra sink only supports streams of Flink Java tuples and Java POJOs that have been annotated for use by Datastax Mapper. The sink should be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76274362 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[jira] [Reopened] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu reopened FLINK-4437: --- Assignee: (was: Ted Yu) > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > Fix For: 1.2.0 > > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-4437. --- Resolution: Fixed > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > Fix For: 1.2.0 > > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437151#comment-15437151 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76274826 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -60,12 +60,17 @@ protected void assignPartitionsToConsumer(KafkaConsumerconsumer consumer.assign(topicPartitions); } + @Override + protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception { + // pass timestamp + super.emitRecord(record, partition, offset, consumerRecord.timestamp()); + } + /** * Emit record Kafka-timestamp aware. */ @Override - protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset, R kafkaRecord) throws Exception { - long timestamp = ((ConsumerRecord) kafkaRecord).timestamp(); + protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset, long timestamp) throws Exception { if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { --- End diff -- Is it possible to let `AbstractFetcher#emitRecord` determine whether to call `collectWithTimestamp` or `collect` depending on the provided timestamp (== `Long.MIN_VALUE`)? Then, we won't need to override the base `emitRecord` here, correct? > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76274826 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -60,12 +60,17 @@ protected void assignPartitionsToConsumer(KafkaConsumerconsumer consumer.assign(topicPartitions); } + @Override + protected void emitRecord(T record, KafkaTopicPartitionState partition, long offset, ConsumerRecord consumerRecord) throws Exception { + // pass timestamp + super.emitRecord(record, partition, offset, consumerRecord.timestamp()); + } + /** * Emit record Kafka-timestamp aware. */ @Override - protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset, R kafkaRecord) throws Exception { - long timestamp = ((ConsumerRecord) kafkaRecord).timestamp(); + protected void emitRecord(T record, KafkaTopicPartitionState partitionState, long offset, long timestamp) throws Exception { if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { --- End diff -- Is it possible to let `AbstractFetcher#emitRecord` determine whether to call `collectWithTimestamp` or `collect` depending on the provided timestamp (== `Long.MIN_VALUE`)? Then, we won't need to override the base `emitRecord` here, correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java
[ https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437148#comment-15437148 ] ASF GitHub Bot commented on FLINK-4363: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r76274362 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2tuple2 = selectNetworkInterfaceAndPort(configuration); + +
[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437141#comment-15437141 ] Alexander Chermenin commented on FLINK-2608: This is a bug in ArraysAsListSerializer from Twitter Chill (https://github.com/twitter/chill/issues/255). It will be fixed after building next release and changing dependency version for this one. > Arrays.asList(..) does not work with CollectionInputFormat > -- > > Key: FLINK-2608 > URL: https://issues.apache.org/jira/browse/FLINK-2608 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 0.9, 0.10.0 >Reporter: Maximilian Michels >Priority: Minor > Fix For: 1.0.0 > > > When using Arrays.asList(..) as input for a CollectionInputFormat, the > serialization/deserialization fails when deploying the task. > See the following program: > {code:java} > public class WordCountExample { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet text = env.fromElements( > "Who's there?", > "I think I hear them. Stand, ho! Who's there?"); > // DOES NOT WORK > List elements = Arrays.asList(0, 0, 0); > // The following works: > //List elements = new ArrayList<>(new int[] {0,0,0}); > DataSet set = env.fromElements(new TestClass(elements)); > DataSet> wordCounts = text > .flatMap(new LineSplitter()) > .withBroadcastSet(set, "set") > .groupBy(0) > .sum(1); > wordCounts.print(); > } > public static class LineSplitter implements FlatMapFunction Tuple2 > { > @Override > public void flatMap(String line, Collector Integer>> out) { > for (String word : line.split(" ")) { > out.collect(new Tuple2 (word, 1)); > } > } > } > public static class TestClass implements Serializable { > private static final long serialVersionUID = -2932037991574118651L; > List integerList; > public TestClass(List integerList){ > this.integerList=integerList; > } > } > {code} > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task > 'DataSource (at main(Test.java:32) > (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the > InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at >
[jira] [Commented] (FLINK-4480) Incorrect link to elastic.co in documentation
[ https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437136#comment-15437136 ] ASF GitHub Bot commented on FLINK-4480: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2416 Merging this... > Incorrect link to elastic.co in documentation > - > > Key: FLINK-4480 > URL: https://issues.apache.org/jira/browse/FLINK-4480 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0, 1.1.1 >Reporter: Fabian Hueske >Assignee: Suneel Marthi >Priority: Trivial > > The link URL of the entry "Elasticsearch 2x (sink)" on the connector's > documentation page > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html > is pointing to http://elastic.com but should point to http://elastic.co -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2416: FLINK-4480: Incorrect link to elastic.co in documentation
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2416 Merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4462) Add RUN_TIME retention policy for all the flink annotations
[ https://issues.apache.org/jira/browse/FLINK-4462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437134#comment-15437134 ] ASF GitHub Bot commented on FLINK-4462: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2412 The `japicmp` is a maven plugin that is added to projects like `flink-core`. It is not part of any code. > Add RUN_TIME retention policy for all the flink annotations > --- > > Key: FLINK-4462 > URL: https://issues.apache.org/jira/browse/FLINK-4462 > Project: Flink > Issue Type: Improvement >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > It is better to add RUNTIME retention policy to flink annotations. So that > utilites/tests can be added to ensure if the classes/interfaces are all > tagged with proper annotations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2412: FLINK-4462 Add RUN_TIME retention policy for all the flin...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2412 The `japicmp` is a maven plugin that is added to projects like `flink-core`. It is not part of any code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437131#comment-15437131 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2315 There are some compilation issues in the build logs: ``` [ERROR] COMPILATION ERROR : [INFO] - [ERROR] /home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[21,33] package com.google.common.collect does not exist [ERROR] /home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[65,24] cannot find symbol ``` Could you rebase to the latest master and also remove any merge commits? Thanks! > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1)
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2315 There are some compilation issues in the build logs: ``` [ERROR] COMPILATION ERROR : [INFO] - [ERROR] /home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[21,33] package com.google.common.collect does not exist [ERROR] /home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[65,24] cannot find symbol ``` Could you rebase to the latest master and also remove any merge commits? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4433) Refactor the StreamSource.
[ https://issues.apache.org/jira/browse/FLINK-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4433: -- Issue Type: Improvement (was: Bug) > Refactor the StreamSource. > -- > > Key: FLINK-4433 > URL: https://issues.apache.org/jira/browse/FLINK-4433 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > With the addition of continuous file monitoring, apart from the > {{StreamSource}} also the {{ContinuousFileReaderOperator}} uses a > {{SourceContext}}. Given this, all the implementations of the > {{SourceContext}} should be removed from the {{StreamSource}} and become > independent classes. > In addition, the {{AsyncExceptionChecker}} interface should be removed as its > functionality can be replaced by the {{task.failExternally()}} method. This > also implies slight changes in the source context implementations. > Also in the {{trigger()}} method of the {{WatermarkEmittingTask}}, all the > {{owner.getCurrentProcessingTime()}} could be replaced by the {{timestamp}} > argument of that method. This will remove some of the calls to the > {{getCurrentProcessingTime()}} which can be expensive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4433) Refactor the StreamSource.
[ https://issues.apache.org/jira/browse/FLINK-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4433: -- Component/s: DataStream API > Refactor the StreamSource. > -- > > Key: FLINK-4433 > URL: https://issues.apache.org/jira/browse/FLINK-4433 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > With the addition of continuous file monitoring, apart from the > {{StreamSource}} also the {{ContinuousFileReaderOperator}} uses a > {{SourceContext}}. Given this, all the implementations of the > {{SourceContext}} should be removed from the {{StreamSource}} and become > independent classes. > In addition, the {{AsyncExceptionChecker}} interface should be removed as its > functionality can be replaced by the {{task.failExternally()}} method. This > also implies slight changes in the source context implementations. > Also in the {{trigger()}} method of the {{WatermarkEmittingTask}}, all the > {{owner.getCurrentProcessingTime()}} could be replaced by the {{timestamp}} > argument of that method. This will remove some of the calls to the > {{getCurrentProcessingTime()}} which can be expensive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4433) Refactor the StreamSource.
[ https://issues.apache.org/jira/browse/FLINK-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-4433: - Assignee: Kostas Kloudas > Refactor the StreamSource. > -- > > Key: FLINK-4433 > URL: https://issues.apache.org/jira/browse/FLINK-4433 > Project: Flink > Issue Type: Bug >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > With the addition of continuous file monitoring, apart from the > {{StreamSource}} also the {{ContinuousFileReaderOperator}} uses a > {{SourceContext}}. Given this, all the implementations of the > {{SourceContext}} should be removed from the {{StreamSource}} and become > independent classes. > In addition, the {{AsyncExceptionChecker}} interface should be removed as its > functionality can be replaced by the {{task.failExternally()}} method. This > also implies slight changes in the source context implementations. > Also in the {{trigger()}} method of the {{WatermarkEmittingTask}}, all the > {{owner.getCurrentProcessingTime()}} could be replaced by the {{timestamp}} > argument of that method. This will remove some of the calls to the > {{getCurrentProcessingTime()}} which can be expensive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #:
Github user rmetzger commented on the pull request: https://github.com/apache/flink/commit/444315a12ca2b1d3de44ea50dda9b8bb5a36bb9e#commitcomment-18778263 Mh, I stumbled across the broken log upload when I tried fixing that issue ... so I thought the commit is related to my efforts fixing the issue ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3761) Refactor State Backends/Make Keyed State Key-Group Aware
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437116#comment-15437116 ] ASF GitHub Bot commented on FLINK-3761: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1988 I think this is subsumed by #2376 Should we close this Pull Request? > Refactor State Backends/Make Keyed State Key-Group Aware > > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1988: [FLINK-3761] Introduction of key groups
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1988 I think this is subsumed by #2376 Should we close this Pull Request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4488) Prevent cluster shutdown after job execution for non-detached jobs
[ https://issues.apache.org/jira/browse/FLINK-4488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437113#comment-15437113 ] ASF GitHub Bot commented on FLINK-4488: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2419 That makes sense, +1 to merge this for 1.1.2 and 1.2 Wondering if we can guard this via a test, so that the FLIP-6 refactoring does not re-introduce the bug. > Prevent cluster shutdown after job execution for non-detached jobs > -- > > Key: FLINK-4488 > URL: https://issues.apache.org/jira/browse/FLINK-4488 > Project: Flink > Issue Type: Bug > Components: YARN Client >Affects Versions: 1.2.0, 1.1.1 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.2.0, 1.1.2 > > > In per-job mode, the Yarn cluster currently shuts down after the first > interactively executed job. Users may want to execute multiple jobs in one > Jar. I would suggest to use this mechanism only for jobs which run detached. > For interactive jobs, shutdown of the cluster is additionally handled by the > CLI which should be sufficient to ensure cluster shutdown. Cluster shutdown > could only become a problem in case of a network partition to the cluster or > outage of the CLI. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2419: [FLINK-4488] only automatically shutdown clusters for det...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2419 That makes sense, +1 to merge this for 1.1.2 and 1.2 Wondering if we can guard this via a test, so that the FLIP-6 refactoring does not re-introduce the bug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
[ https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437100#comment-15437100 ] ASF GitHub Bot commented on FLINK-4418: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2383 Merging this... > ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if > InetAddress.getLocalHost throws exception > -- > > Key: FLINK-4418 > URL: https://issues.apache.org/jira/browse/FLINK-4418 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.1.0 >Reporter: Shannon Carey >Assignee: Shannon Carey > > When attempting to connect to a cluster with a ClusterClient, if the > machine's hostname is not resolvable to an IP, an exception is thrown > preventing success. > This is the case if, for example, the hostname is not present & mapped to a > local IP in /etc/hosts. > The exception is below. I suggest that findAddressUsingStrategy() should > catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and > return null, allowing alternative strategies to be attempted by > findConnectingAddress(). I will open a PR to this effect. Ideally this could > be included in both 1.2 and 1.1.2. > In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS > EC2 instance. > {code} > 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed > to retrieve the JobManager gateway. > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) > 21:11:35 at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > 21:11:35 at > org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) > 21:11:35 at > com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) > 21:11:35 at > com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) > 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager > address at /10.2.89.80:43126 > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) > 21:11:35 ... 8 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: > ip-10-2-64-47: unknown error > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1505) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) > 21:11:35 ... 10 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown > error > 21:11:35 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > 21:11:35 at > java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) > 21:11:35 at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1500) > 21:11:35 ... 13 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-4437. --- Resolution: Fixed > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Ted Yu > Fix For: 1.2.0 > > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields
[ https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu reopened FLINK-4437: --- Assignee: Ted Yu > Lock evasion around lastTriggeredCheckpoint may lead to lost updates to > related fields > -- > > Key: FLINK-4437 > URL: https://issues.apache.org/jira/browse/FLINK-4437 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Ted Yu > Fix For: 1.2.0 > > > In CheckpointCoordinator#triggerCheckpoint(): > {code} > // make sure the minimum interval between checkpoints has passed > if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) > { > {code} > If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints > > timestamp' in close proximity before lastTriggeredCheckpoint is updated, > the two threads may have an inconsistent view of "lastTriggeredCheckpoint" > and updates to fields correlated with "lastTriggeredCheckpoint" may be lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76268687 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): + * + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a section invocation option, approach (b). + * + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer010 extends StreamSink implements SinkFunction, RichFunction { + + /** +* Flag controlling whether we are writing the Flink record's timestamp into Kafka. +*/ + private boolean writeTimestampToKafka = false; + + // -- "Constructors" for timestamp writing -- + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to +* the topic. +* +* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) +* +* @param inStream The stream to write to Kafka +* @param topicId ID of the Kafka topic. +* @param serializationSchema User defined serialization schema supporting key/value messages +* @param producerConfig Properties with the producer configuration. +*/ + public static FlinkKafkaProducer010Configuration writeToKafka(DataStream inStream, +
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437096#comment-15437096 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76268687 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): + * + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a section invocation option, approach (b). + * + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer010 extends StreamSink implements SinkFunction, RichFunction { + + /** +* Flag controlling whether we are writing the Flink record's timestamp into Kafka. +*/ + private boolean writeTimestampToKafka = false; + + // -- "Constructors" for timestamp writing -- + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to +* the topic. +* +* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) +* +* @param inStream The stream to write to Kafka +* @param topicId ID of the Kafka topic. +* @param serializationSchema User defined serialization schema supporting
[GitHub] flink issue #2383: [FLINK-4418] [client] Improve resilience when InetAddress...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2383 Merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/444315a12ca2b1d3de44ea50dda9b8bb5a36bb9e#commitcomment-18778229 Does this have the wrong issue tag? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4495) Running multiple jobs on yarn (without yarn-session)
[ https://issues.apache.org/jira/browse/FLINK-4495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437088#comment-15437088 ] Maximilian Michels commented on FLINK-4495: --- Thanks [~nielsbasjes] for opening the jira issue. I also created one (FLINK-4488) with an easy fix. > Running multiple jobs on yarn (without yarn-session) > > > Key: FLINK-4495 > URL: https://issues.apache.org/jira/browse/FLINK-4495 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Reporter: Niels Basjes > > I created a small application that needs to run multiple (batch) jobs on Yarn > and then terminate. > I essentially do right now the following: > flink run -m yarn-cluster -yn 10 bla.jar ... > And in my main I do > foreach thing I need to do { >ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); >env. ... define the batch job. >env.execute > } > In the second job I submit I get an exception: > {code} > java.lang.RuntimeException: Unable to tell application master to stop once > the specified job has been finised > at > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220) > at com.bol.tools.hbase.export.Main.run(Main.java:81) > at com.bol.tools.hbase.export.Main.main(Main.java:42) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992) > at > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046) > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [1 milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at scala.concurrent.Await.result(package.scala) > at > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182) > ... 25 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
[ https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437091#comment-15437091 ] ASF GitHub Bot commented on FLINK-4418: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2383 I would merge this fix by itself first. Then, if you have a nice way in mind to make this simpler, I'd be happy to look at another pull request ;-) > ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if > InetAddress.getLocalHost throws exception > -- > > Key: FLINK-4418 > URL: https://issues.apache.org/jira/browse/FLINK-4418 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.1.0 >Reporter: Shannon Carey >Assignee: Shannon Carey > > When attempting to connect to a cluster with a ClusterClient, if the > machine's hostname is not resolvable to an IP, an exception is thrown > preventing success. > This is the case if, for example, the hostname is not present & mapped to a > local IP in /etc/hosts. > The exception is below. I suggest that findAddressUsingStrategy() should > catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and > return null, allowing alternative strategies to be attempted by > findConnectingAddress(). I will open a PR to this effect. Ideally this could > be included in both 1.2 and 1.1.2. > In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS > EC2 instance. > {code} > 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed > to retrieve the JobManager gateway. > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) > 21:11:35 at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > 21:11:35 at > org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) > 21:11:35 at > com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) > 21:11:35 at > com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) > 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager > address at /10.2.89.80:43126 > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) > 21:11:35 ... 8 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: > ip-10-2-64-47: unknown error > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1505) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) > 21:11:35 ... 10 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown > error > 21:11:35 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > 21:11:35 at > java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) > 21:11:35 at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1500) > 21:11:35 ... 13 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437092#comment-15437092 ] ASF GitHub Bot commented on FLINK-4035: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76268268 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): + * + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a section invocation option, approach (b). + * + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer010 extends StreamSink implements SinkFunction, RichFunction { + + /** +* Flag controlling whether we are writing the Flink record's timestamp into Kafka. +*/ + private boolean writeTimestampToKafka = false; + + // -- "Constructors" for timestamp writing -- + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to +* the topic. +* +* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) +* +* @param inStream The stream to write to Kafka +* @param topicId ID of the Kafka topic. +* @param serializationSchema User defined serialization schema supporting
[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2369#discussion_r76268268 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java --- @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; + + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x + * + * Implementation note: This producer is a hybrid between a regular regular sink function (a) + * and a custom operator (b). + * + * For (a), the class implements the SinkFunction and RichFunction interfaces. + * For (b), it extends the StreamTask class. + * + * Details about approach (a): + * + * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the + * DataStream.addSink() method. + * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record + * the Kafka 0.10 producer has a section invocation option, approach (b). + * + * Details about approach (b): + * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the + * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() method, the Kafka producer + * can access the internal record timestamp of the record and write it to Kafka. + * + * All methods and constructors in this class are marked with the approach they are needed for. + */ +public class FlinkKafkaProducer010 extends StreamSink implements SinkFunction, RichFunction { + + /** +* Flag controlling whether we are writing the Flink record's timestamp into Kafka. +*/ + private boolean writeTimestampToKafka = false; + + // -- "Constructors" for timestamp writing -- + + /** +* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to +* the topic. +* +* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) +* +* @param inStream The stream to write to Kafka +* @param topicId ID of the Kafka topic. +* @param serializationSchema User defined serialization schema supporting key/value messages +* @param producerConfig Properties with the producer configuration. +*/ + public static FlinkKafkaProducer010Configuration writeToKafka(DataStream inStream, +