[jira] [Assigned] (FLINK-4459) Introduce SlotProvider for Scheduler

2016-08-25 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-4459:
-

Assignee: Kurt Young

> Introduce SlotProvider for Scheduler
> 
>
> Key: FLINK-4459
> URL: https://issues.apache.org/jira/browse/FLINK-4459
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Reporter: Till Rohrmann
>Assignee: Kurt Young
>
> Currently the {{Scheduler}} maintains a queue of available instances which it 
> scans if it needs a new slot. If it finds a suitable instance (having free 
> slots available) it will allocate a slot from it. 
> This slot allocation logic can be factored out and be made available via a 
> {{SlotProvider}} interface. The {{SlotProvider}} has methods to allocate a 
> slot given a set of location preferences. Slots should be returned as 
> {{Futures}}, because in the future the slot allocation might happen 
> asynchronously (Flip-6). 
> In the first version, the {{SlotProvider}} implementation will simply 
> encapsulate the existing slot allocation logic extracted from the 
> {{Scheduler}}. When a slot is requested it will return a completed or failed 
> future since the allocation happens synchronously.
> The refactoring will have the advantage to simplify the {{Scheduler}} class 
> and to pave the way for upcoming refactorings (Flip-6).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4478) Implement heartbeat logic

2016-08-25 Thread zhangjing (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438437#comment-15438437
 ] 

zhangjing edited comment on FLINK-4478 at 8/26/16 4:04 AM:
---

Hi [~till.rohrmann], I saw your document and new branch. Good design. 


was (Author: jinyu.zj):
Good abstraction.

> Implement heartbeat logic
> -
>
> Key: FLINK-4478
> URL: https://issues.apache.org/jira/browse/FLINK-4478
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> With the Flip-6 refactoring, we'll have the need for a dedicated heartbeat 
> component. The heartbeat component is used to check the liveliness of the 
> distributed components among each other. Furthermore, heartbeats are used to 
> regularly transmit status updates to another component. For example, the 
> TaskManager informs the ResourceManager with each heartbeat about the current 
> slot allocation.
> The heartbeat is initiated from one component. This component sends a 
> heartbeat request to another component which answers with an heartbeat 
> response. Thus, one can differentiate between a sending and a receiving side. 
> Apart from the triggering of the heartbeat request, the logic of treating 
> heartbeats, marking components dead and payload delivery are the same and 
> should be reusable by different distributed components (JM, TM, RM).
> Different models for the heartbeat reporting are conceivable. First of all, 
> the heartbeat request could be sent as an ask operation where the heartbeat 
> response is returned as a future on the sending side. Alternatively, the 
> sending side could request a heartbeat response by sending a tell message. 
> The heartbeat response is then delivered by an RPC back to the heartbeat 
> sender. The latter model has the advantage that a heartbeat response is not 
> tightly coupled to a heartbeat request. Such a tight coupling could cause 
> that heartbeat response are ignored after the future has timed out even 
> though they might still contain valuable information (receiver is still 
> alive).
> Furthermore, different strategies for the heartbeat triggering and marking 
> heartbeat targets as dead are conceivable. For example, we could periodically 
> (with a fixed period) trigger a heartbeat request and mark all targets as 
> dead if we didn't receive a heartbeat response in a given time period. 
> Furthermore, we could adapt the heartbeat interval and heartbeat timeouts 
> with respect to the latency of previous heartbeat responses. This would 
> reflect the current load and network conditions better.
> For the first version, I would propose to use a fixed period heartbeat with a 
> maximum heartbeat timeout before a target is marked dead. Furthermore, I 
> would propose to use tell messages (fire and forget) to request and report 
> heartbeats because they are the more flexible model imho.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4478) Implement heartbeat logic

2016-08-25 Thread zhangjing (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438437#comment-15438437
 ] 

zhangjing commented on FLINK-4478:
--

Good abstraction.

> Implement heartbeat logic
> -
>
> Key: FLINK-4478
> URL: https://issues.apache.org/jira/browse/FLINK-4478
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> With the Flip-6 refactoring, we'll have the need for a dedicated heartbeat 
> component. The heartbeat component is used to check the liveliness of the 
> distributed components among each other. Furthermore, heartbeats are used to 
> regularly transmit status updates to another component. For example, the 
> TaskManager informs the ResourceManager with each heartbeat about the current 
> slot allocation.
> The heartbeat is initiated from one component. This component sends a 
> heartbeat request to another component which answers with an heartbeat 
> response. Thus, one can differentiate between a sending and a receiving side. 
> Apart from the triggering of the heartbeat request, the logic of treating 
> heartbeats, marking components dead and payload delivery are the same and 
> should be reusable by different distributed components (JM, TM, RM).
> Different models for the heartbeat reporting are conceivable. First of all, 
> the heartbeat request could be sent as an ask operation where the heartbeat 
> response is returned as a future on the sending side. Alternatively, the 
> sending side could request a heartbeat response by sending a tell message. 
> The heartbeat response is then delivered by an RPC back to the heartbeat 
> sender. The latter model has the advantage that a heartbeat response is not 
> tightly coupled to a heartbeat request. Such a tight coupling could cause 
> that heartbeat response are ignored after the future has timed out even 
> though they might still contain valuable information (receiver is still 
> alive).
> Furthermore, different strategies for the heartbeat triggering and marking 
> heartbeat targets as dead are conceivable. For example, we could periodically 
> (with a fixed period) trigger a heartbeat request and mark all targets as 
> dead if we didn't receive a heartbeat response in a given time period. 
> Furthermore, we could adapt the heartbeat interval and heartbeat timeouts 
> with respect to the latency of previous heartbeat responses. This would 
> reflect the current load and network conditions better.
> For the first version, I would propose to use a fixed period heartbeat with a 
> maximum heartbeat timeout before a target is marked dead. Furthermore, I 
> would propose to use tell messages (fire and forget) to request and report 
> heartbeats because they are the more flexible model imho.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-25 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438412#comment-15438412
 ] 

ramkrishna.s.vasudevan commented on FLINK-3322:
---

[~StephanEwen]
I thought of working on this? If you can walk through the future improvements 
to memory management I can take this up. 
Fine with [~ggevay] is also ready to volunteer in this. Let me know what you 
think [~StephanEwen].

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4462) Add RUN_TIME retention policy for all the flink annotations

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438402#comment-15438402
 ] 

ASF GitHub Bot commented on FLINK-4462:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2412
  
@StephanEwen 
I meant the library only. I did not mean it is added in code. Am not sure 
what are the APIs in that library to make use.


> Add RUN_TIME retention policy for all the flink annotations
> ---
>
> Key: FLINK-4462
> URL: https://issues.apache.org/jira/browse/FLINK-4462
> Project: Flink
>  Issue Type: Improvement
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>
> It is better to add RUNTIME retention policy to flink annotations. So that 
> utilites/tests can be added to ensure if the classes/interfaces are all 
> tagged with proper annotations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2412: FLINK-4462 Add RUN_TIME retention policy for all the flin...

2016-08-25 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2412
  
@StephanEwen 
I meant the library only. I did not mean it is added in code. Am not sure 
what are the APIs in that library to make use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4502) Cassandra connector documentation has misleading consistency guarantees

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4502:
-

 Summary: Cassandra connector documentation has misleading 
consistency guarantees
 Key: FLINK-4502
 URL: https://issues.apache.org/jira/browse/FLINK-4502
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Elias Levy


The Cassandra connector documentation states that  "enableWriteAheadLog() is an 
optional method, that allows exactly-once processing for non-deterministic 
algorithms."  This claim appears to be false.

>From what I gather, the write ahead log feature of the connector works as 
>follows:
- The sink is replaced with a stateful operator that writes incoming messages 
to the state backend based on checkpoint they belong in.
- When the operator is notified that a Flink checkpoint has been completed it, 
for each set of checkpoints older than and including the committed one:
  * reads its messages from the state backend
  * writes them to Cassandra
  * records that it has committed them to Cassandra for the specific checkpoint 
and operator instance
   * and erases them from the state backend.

This process attempts to avoid resubmitting queries to Cassandra that would 
otherwise occur when recovering a job from a checkpoint and having messages 
replayed.

Alas, this does not guarantee exactly once semantics at the sink.  The writes 
to Cassandra that occur when the operator is notified that checkpoint is 
completed are not atomic and they are potentially non-idempotent.  If the job 
dies while writing to Cassandra or before committing the checkpoint via 
committer, queries will be replayed when the job recovers.  Thus the 
documentation appear to be incorrect in stating this provides exactly-once 
semantics.

There also seems to be an issue in GenericWriteAheadSink's 
notifyOfCompletedCheckpoint which may result in incorrect output.  If 
sendValues returns false because a write failed, instead of bailing, it simply 
moves on to the next checkpoint to commit if there is one, keeping the previous 
one around to try again later.  But that can result in newer data being 
overwritten with older data when the previous checkpoint is retried.  Although 
given that CassandraCommitter implements isCheckpointCommitted as checkpointID 
<= this.lastCommittedCheckpointID, it actually means that when it goes back to 
try the uncommitted older checkpoint it will consider it committed, even though 
some of its data may not have been written out, and the data will be discarded.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4501) Cassandra sink can lose messages

2016-08-25 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy closed FLINK-4501.
-
Resolution: Duplicate

HTTP connection got reset and was submitted twice.

> Cassandra sink can lose messages
> 
>
> Key: FLINK-4501
> URL: https://issues.apache.org/jira/browse/FLINK-4501
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>
> The problem is the same as I pointed out with the Kafka producer sink 
> (FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
> both send data asynchronously to Cassandra and record whether an error occurs 
> via a future callback.  But CassandraSinkBase does not implement 
> Checkpointed, so it can't stop checkpoint from happening even though the are 
> Cassandra queries in flight from the checkpoint that may fail.  If they do 
> fail, they would subsequently not be replayed when the job recovered, and 
> would thus be lost.
> In addition, 
> CassandraSinkBase's close should check whether there is a pending exception 
> and throw it, rather than silently close.  It should also wait for any 
> pending async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4500) Cassandra sink can lose messages

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4500:
-

 Summary: Cassandra sink can lose messages
 Key: FLINK-4500
 URL: https://issues.apache.org/jira/browse/FLINK-4500
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The problem is the same as I pointed out with the Kafka producer sink 
(FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
both send data asynchronously to Cassandra and record whether an error occurs 
via a future callback.  But CassandraSinkBase does not implement Checkpointed, 
so it can't stop checkpoint from happening even though the are Cassandra 
queries in flight from the checkpoint that may fail.  If they do fail, they 
would subsequently not be replayed when the job recovered, and would thus be 
lost.

In addition, 
CassandraSinkBase's close should check whether there is a pending exception and 
throw it, rather than silently close.  It should also wait for any pending 
async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4501) Cassandra sink can lose messages

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4501:
-

 Summary: Cassandra sink can lose messages
 Key: FLINK-4501
 URL: https://issues.apache.org/jira/browse/FLINK-4501
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The problem is the same as I pointed out with the Kafka producer sink 
(FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
both send data asynchronously to Cassandra and record whether an error occurs 
via a future callback.  But CassandraSinkBase does not implement Checkpointed, 
so it can't stop checkpoint from happening even though the are Cassandra 
queries in flight from the checkpoint that may fail.  If they do fail, they 
would subsequently not be replayed when the job recovered, and would thus be 
lost.

In addition, 
CassandraSinkBase's close should check whether there is a pending exception and 
throw it, rather than silently close.  It should also wait for any pending 
async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437818#comment-15437818
 ] 

ASF GitHub Bot commented on FLINK-4499:
---

Github user smarthi commented on the issue:

https://github.com/apache/flink/pull/2422
  
Setup a default findbugs-exclude.xml, please feel free to customize as u 
deem fit for Flink.


> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437815#comment-15437815
 ] 

ASF GitHub Bot commented on FLINK-4499:
---

GitHub user smarthi opened a pull request:

https://github.com/apache/flink/pull/2422

FLINK-4499: Introduce findbugs maven plugin

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smarthi/flink FLINK-4499

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2422






> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2422: FLINK-4499: Introduce findbugs maven plugin

2016-08-25 Thread smarthi
Github user smarthi commented on the issue:

https://github.com/apache/flink/pull/2422
  
Setup a default findbugs-exclude.xml, please feel free to customize as u 
deem fit for Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2422: FLINK-4499: Introduce findbugs maven plugin

2016-08-25 Thread smarthi
GitHub user smarthi opened a pull request:

https://github.com/apache/flink/pull/2422

FLINK-4499: Introduce findbugs maven plugin

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smarthi/flink FLINK-4499

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2422






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437693#comment-15437693
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2374
  
@zentol I've updated Meter's interface. Could you give this PR another 
review?
Sorry for the force push, I had to rebase on top of `master` to resolve 
merge conflict.


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2374: [FLINK-3950] Add Meter Metric Type

2016-08-25 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2374
  
@zentol I've updated Meter's interface. Could you give this PR another 
review?
Sorry for the force push, I had to rebase on top of `master` to resolve 
merge conflict.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4440) Make API for edge/vertex creation less verbose

2016-08-25 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437533#comment-15437533
 ] 

Greg Hogan commented on FLINK-4440:
---

I think it is good to be explicit when assigning the vertex and edge values. By 
adding a second (and third?) means of creating a vertex simply adds to a user's 
cognitive load. The cost is small but I think the benefit is very small. More 
importantly, we should strive to solve the underlying problem of requiring one 
but only one value.

> Make API for edge/vertex creation less verbose
> --
>
> Key: FLINK-4440
> URL: https://issues.apache.org/jira/browse/FLINK-4440
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>Priority: Trivial
>
> It would be better if one could create vertex/edges like this:
> {code:java}
> Vertex v = Vertex.create(42);
> Edge e = Edge.create(5, 6);
> {code}
> Instead of this:
> {code:java}
> Vertex v = new Vertex(42, 
> NullValue.getInstance());
> Edge e = new Edge NullValue>(5, 6, NullValue.getInstance());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437475#comment-15437475
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
Hi, I have updated the PR based on @ramkrish86 's comments.
1. rename MutationActionList to MutationActions which has a public method 
createMutations that  
   return HBase Mutations
2. make MutationAction a private inner class
3. DeleteColumn action can now delete the latest version of the specified 
column
4. check if rowkey is null when create Mutations
5. hand over the combinations of Delete logic to HBase server
And I checked if table doesn't exist in HBase in which case a table not 
found exception will be thrown



> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-08-25 Thread delding
Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
Hi, I have updated the PR based on @ramkrish86 's comments.
1. rename MutationActionList to MutationActions which has a public method 
createMutations that  
   return HBase Mutations
2. make MutationAction a private inner class
3. DeleteColumn action can now delete the latest version of the specified 
column
4. check if rowkey is null when create Mutations
5. hand over the combinations of Delete logic to HBase server
And I checked if table doesn't exist in HBase in which case a table not 
found exception will be thrown



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4459) Introduce SlotProvider for Scheduler

2016-08-25 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437459#comment-15437459
 ] 

Stephan Ewen commented on FLINK-4459:
-

Yes, this is needed.

In the master branch, the current {{Scheduler}} should implement the 
{{SlotProvider}} interface. It would basically only hand out futures or slots, 
and then the {{ExecutionGraph}} code could check that the futures are always 
already completed.

In the FLIP-6 branch, we can adapt the scheduler to a different 
{{SlotProvider}}. I think it makes sense there as well to keep the logic about 
{{SlotSharingGroup}} and {{CoLocationConstraint}} out of the {{SlotPool}} and 
in a separate class (called {{Slotprovider}} or {{Scheduler}} or whatever.

> Introduce SlotProvider for Scheduler
> 
>
> Key: FLINK-4459
> URL: https://issues.apache.org/jira/browse/FLINK-4459
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Reporter: Till Rohrmann
>
> Currently the {{Scheduler}} maintains a queue of available instances which it 
> scans if it needs a new slot. If it finds a suitable instance (having free 
> slots available) it will allocate a slot from it. 
> This slot allocation logic can be factored out and be made available via a 
> {{SlotProvider}} interface. The {{SlotProvider}} has methods to allocate a 
> slot given a set of location preferences. Slots should be returned as 
> {{Futures}}, because in the future the slot allocation might happen 
> asynchronously (Flip-6). 
> In the first version, the {{SlotProvider}} implementation will simply 
> encapsulate the existing slot allocation logic extracted from the 
> {{Scheduler}}. When a slot is requested it will return a completed or failed 
> future since the allocation happens synchronously.
> The refactoring will have the advantage to simplify the {{Scheduler}} class 
> and to pave the way for upcoming refactorings (Flip-6).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-08-25 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-3679:
--
Component/s: Kafka Connector

> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-08-25 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437453#comment-15437453
 ] 

Stephan Ewen commented on FLINK-4490:
-

For the FLIP-6 work, we need that agreed.
Here is an idea of how we can make this hopefully without a lot of additional 
work:

(1) We introduce an abstract class {{Slot}} class that defines the methods that 
the {{ExecutionGraph}} and {{JobManager}} need, like
- fail()
- release()
- return()
- getTaskManagerGateway()

The {{SlotProvider}} will expose the new {{Slot}}.

(2) We implement a version of that slot that internally contains the current 
master's slot, to keep this compatible in the master, and we change the 
ExecutionGraph to work on the new {{Slot}}. The scheduler wraps the old slot to 
the new slot.

(3) We can then implement the FLIP-6 specific variant of the new slot.

> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Reporter: Kurt Young
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-25 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437410#comment-15437410
 ] 

Gabor Gevay commented on FLINK-3322:


Yes, I volunteer to help :)

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4278) Unclosed FSDataOutputStream in multiple files in the project

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437396#comment-15437396
 ] 

ASF GitHub Bot commented on FLINK-4278:
---

Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2403
  
@StephanEwen, let me close this and open a new one after some testing. Will 
post an new PR this week.


> Unclosed FSDataOutputStream in multiple files in the project
> 
>
> Key: FLINK-4278
> URL: https://issues.apache.org/jira/browse/FLINK-4278
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>
> After FLINK-4259, I did a check and found that the following files don't have 
> the closing of the FSDataOutputStream.
> The following are the files and the corresponding methods missing the close() 
> 1) FSDataOutputStream.java adding a close method in the abstract class
> 2) FSStateBackend flush() and write() - closing the FSDataOutputStream
> 3) StringWriter.java write()
> 4) FileSystemStateStore putState() -  closing the FSDataOutputStream
> 5) HadoopDataOutputStream.java not too sure if this needs closing.
> 6) FileSystemStateStorageHelper.java store() need closing for both outStream 
> and the ObjectOutputStream
> The options to think would be to either close or use  IOUtils.closeQuietly() 
> Any thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2403: [FLINK-4278]: Unclosed FSDataOutputStream in multi...

2016-08-25 Thread nssalian
Github user nssalian closed the pull request at:

https://github.com/apache/flink/pull/2403


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4278) Unclosed FSDataOutputStream in multiple files in the project

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437395#comment-15437395
 ] 

ASF GitHub Bot commented on FLINK-4278:
---

Github user nssalian closed the pull request at:

https://github.com/apache/flink/pull/2403


> Unclosed FSDataOutputStream in multiple files in the project
> 
>
> Key: FLINK-4278
> URL: https://issues.apache.org/jira/browse/FLINK-4278
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>
> After FLINK-4259, I did a check and found that the following files don't have 
> the closing of the FSDataOutputStream.
> The following are the files and the corresponding methods missing the close() 
> 1) FSDataOutputStream.java adding a close method in the abstract class
> 2) FSStateBackend flush() and write() - closing the FSDataOutputStream
> 3) StringWriter.java write()
> 4) FileSystemStateStore putState() -  closing the FSDataOutputStream
> 5) HadoopDataOutputStream.java not too sure if this needs closing.
> 6) FileSystemStateStorageHelper.java store() need closing for both outStream 
> and the ObjectOutputStream
> The options to think would be to either close or use  IOUtils.closeQuietly() 
> Any thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2403: [FLINK-4278]: Unclosed FSDataOutputStream in multiple fil...

2016-08-25 Thread nssalian
Github user nssalian commented on the issue:

https://github.com/apache/flink/pull/2403
  
@StephanEwen, let me close this and open a new one after some testing. Will 
post an new PR this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4100) RocksDBStateBackend#close() can throw NPE

2016-08-25 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437392#comment-15437392
 ] 

Neelesh Srinivas Salian commented on FLINK-4100:


Sounds good. Thanks [~aljoscha]

> RocksDBStateBackend#close() can throw NPE
> -
>
> Key: FLINK-4100
> URL: https://issues.apache.org/jira/browse/FLINK-4100
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Priority: Trivial
>
> When running the RocksDBStateBackendTest on Windows i ran into an NPE. The 
> tests are aborted in the @Before checkOperatingSystem method (which is 
> correct behaviour), but the test still calls dispose() in @After teardown().
> This lead to an NPE since the lock object used is null; it was not 
> initialized since initializeForJob() was never called and there is no null 
> check.
> {code}
> testCopyDefaultValue(org.apache.flink.contrib.streaming.state.RocksDBStateBackendTest)
>   Time elapsed: 0 sec  <<< ERROR!
> java.lang.NullPointerException: null
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.dispose(RocksDBStateBackend.java:318)
> at 
> org.apache.flink.runtime.state.StateBackendTestBase.teardown(StateBackendTestBase.java:71)
> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4245) Metric naming improvements

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437391#comment-15437391
 ] 

ASF GitHub Bot commented on FLINK-4245:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2418
  
Looks like we could just pop the variables map into constructor and get the 
key names for free, that's neat.

I wouldn't worry too much about disregarding the naming scheme; in contrast 
to other reporters additional keys (as i understand it) don't make it more 
difficult to work with a given metric.

What we could do is add a configuration option `exposeAllKeys` or something 
that if set pops the map into the constructor, otherwise it uses the configured 
naming scheme.


> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2418: [FLINK-4245] JMXReporter exposes all defined variables

2016-08-25 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2418
  
Looks like we could just pop the variables map into constructor and get the 
key names for free, that's neat.

I wouldn't worry too much about disregarding the naming scheme; in contrast 
to other reporters additional keys (as i understand it) don't make it more 
difficult to work with a given metric.

What we could do is add a configuration option `exposeAllKeys` or something 
that if set pops the map into the constructor, otherwise it uses the configured 
naming scheme.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4482) numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437383#comment-15437383
 ] 

ASF GitHub Bot commented on FLINK-4482:
---

GitHub user tedyu opened a pull request:

https://github.com/apache/flink/pull/2421

FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding 
triggerLock

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2421


commit 2021f4790715ecb762dbb23438bf0b2b2755845e
Author: tedyu 
Date:   2016-08-25T18:11:44Z

FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding 
triggerLock




> numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock
> --
>
> Key: FLINK-4482
> URL: https://issues.apache.org/jira/browse/FLINK-4482
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In CheckpointCoordinator#stopCheckpointScheduler() :
> {code}
> synchronized (lock) {
> ...
>   numUnsuccessfulCheckpointsTriggers = 0;
> {code}
> triggerLock is not held in the above operation.
> See comment for triggerLock earlier in triggerCheckpoint():
> {code}
> // we lock with a special lock to make sure that trigger requests do not 
> overtake each other.
> // this is not done with the coordinator-wide lock, because the 
> 'checkpointIdCounter'
> // may issue blocking operations. Using a different lock than teh 
> coordinator-wide lock,
> // we avoid blocking the processing of 'acknowledge/decline' messages 
> during that time.
> synchronized (triggerLock) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2421: FLINK-4482 numUnsuccessfulCheckpointsTriggers is a...

2016-08-25 Thread tedyu
GitHub user tedyu opened a pull request:

https://github.com/apache/flink/pull/2421

FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding 
triggerLock

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2421


commit 2021f4790715ecb762dbb23438bf0b2b2755845e
Author: tedyu 
Date:   2016-08-25T18:11:44Z

FLINK-4482 numUnsuccessfulCheckpointsTriggers is accessed without holding 
triggerLock




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437322#comment-15437322
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
Hey @fhueske 
I've renamed tests as you suggested.


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

2016-08-25 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
Hey @fhueske 
I've renamed tests as you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-25 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437314#comment-15437314
 ] 

Stephan Ewen commented on FLINK-3322:
-

Would you volunteer to help with this?

If yes, I'd be happy to walk you through some approach that would be well 
aligned with thoughts for future improvements to memory management.

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts

2016-08-25 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437310#comment-15437310
 ] 

Stephan Ewen commented on FLINK-3030:
-

Thanks, that is a great issue to solve!

> Enhance Dashboard to show Execution Attempts
> 
>
> Key: FLINK-3030
> URL: https://issues.apache.org/jira/browse/FLINK-3030
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
> Fix For: 1.0.0
>
>
> Currently, the web dashboard shows only the latest execution attempt. We 
> should make all execution attempts and their accumulators available for 
> inspection.
> The REST monitoring API supports this, so it should be a change only to the 
> frontend part.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4440) Make API for edge/vertex creation less verbose

2016-08-25 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437307#comment-15437307
 ] 

Vasia Kalavri commented on FLINK-4440:
--

I don't have a strong opinion against adding the {{create}} methods. Why do you 
think there is a cost to adding them [~greghogan]?

> Make API for edge/vertex creation less verbose
> --
>
> Key: FLINK-4440
> URL: https://issues.apache.org/jira/browse/FLINK-4440
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Reporter: Ivan Mushketyk
>Assignee: Ivan Mushketyk
>Priority: Trivial
>
> It would be better if one could create vertex/edges like this:
> {code:java}
> Vertex v = Vertex.create(42);
> Edge e = Edge.create(5, 6);
> {code}
> Instead of this:
> {code:java}
> Vertex v = new Vertex(42, 
> NullValue.getInstance());
> Edge e = new Edge NullValue>(5, 6, NullValue.getInstance());
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4499) Introduce findbugs maven plugin

2016-08-25 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4499:
-

 Summary: Introduce findbugs maven plugin
 Key: FLINK-4499
 URL: https://issues.apache.org/jira/browse/FLINK-4499
 Project: Flink
  Issue Type: Improvement
Reporter: Ted Yu


As suggested by Stephan in FLINK-4482, this issue is to add 
findbugs-maven-plugin into the build process so that we can detect lack of 
proper locking and other defects automatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2002: Support for bz2 compression in flink-core

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2002#discussion_r76288290
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java
 ---
@@ -23,13 +23,12 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
-import java.util.zip.InflaterInputStream;
 
 /**
  * Creates a new instance of a certain subclass of {@link 
java.util.zip.InflaterInputStream}.
  */
 @Internal
-public interface InflaterInputStreamFactory 
{
+public interface InflaterInputStreamFactory {
--- End diff --

I think Greg raised a good point, the classname and comments should be 
adjusted as well. How about `DecompressingStreamFactory`, or 
`DecoratorStreamFactory` (if you want to keep it generic)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2294: [FLINK-4265] [dataset api] Add a NoOpOperator

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2294
  
This is okay to merge from my side. Definitely seems easier than the 
`Delegate` trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4265) Add a NoOpOperator

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437279#comment-15437279
 ] 

ASF GitHub Bot commented on FLINK-4265:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2294
  
This is okay to merge from my side. Definitely seems easier than the 
`Delegate` trick.


> Add a NoOpOperator
> --
>
> Key: FLINK-4265
> URL: https://issues.apache.org/jira/browse/FLINK-4265
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>
> One recent feature of Gelly is algorithms which detect duplicated or similar 
> computation which can be shared. My initial implementation could only reuse a 
> {{DataSet}} result. Before committing to Flink this was updated to use a 
> javassist {{ProxyFactory}} allowing configuration to be merged and results to 
> be replaced. There were some issues, as identified in FLINK-4257. With a 
> {{NoOpOperator}} we can remove the use of {{ProxyFactory}} and resolve the 
> identified issues.
> This ticket adds a {{NoOpOperator}} which is unwound in 
> {{OperatorTranslation.translate}}. The {{NoOpOperator}} contains a 
> {{DataSet}} which is accessed by a getter and setter.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4482) numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock

2016-08-25 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437274#comment-15437274
 ] 

Stephan Ewen commented on FLINK-4482:
-

I think the statements can be adjusted to be properly locked.

How about adding some {{GuardedBy}} annotations and adding the maven "findbugs" 
plugin to check such situations automatically?

> numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock
> --
>
> Key: FLINK-4482
> URL: https://issues.apache.org/jira/browse/FLINK-4482
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> In CheckpointCoordinator#stopCheckpointScheduler() :
> {code}
> synchronized (lock) {
> ...
>   numUnsuccessfulCheckpointsTriggers = 0;
> {code}
> triggerLock is not held in the above operation.
> See comment for triggerLock earlier in triggerCheckpoint():
> {code}
> // we lock with a special lock to make sure that trigger requests do not 
> overtake each other.
> // this is not done with the coordinator-wide lock, because the 
> 'checkpointIdCounter'
> // may issue blocking operations. Using a different lock than teh 
> coordinator-wide lock,
> // we avoid blocking the processing of 'acknowledge/decline' messages 
> during that time.
> synchronized (triggerLock) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437243#comment-15437243
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
Thank you for helping with this PR @fhueske 
It's ok, I'll update the PR in an hour or so.


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

2016-08-25 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
Thank you for helping with this PR @fhueske 
It's ok, I'll update the PR in an hour or so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437238#comment-15437238
 ] 

ASF GitHub Bot commented on FLINK-4437:
---

Github user tedyu closed the pull request at:

https://github.com/apache/flink/pull/2409


> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
> Fix For: 1.2.0
>
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2409: FLINK-4437 Lock evasion around lastTriggeredCheckp...

2016-08-25 Thread tedyu
Github user tedyu closed the pull request at:

https://github.com/apache/flink/pull/2409


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2420: [hotfix] Fix invalid url link in documentation

2016-08-25 Thread haoch
Github user haoch commented on the issue:

https://github.com/apache/flink/pull/2420
  
@fhueske sure and thanks for the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2420: [hotfix] Fix invalid url link in documentation

2016-08-25 Thread haoch
Github user haoch closed the pull request at:

https://github.com/apache/flink/pull/2420


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...

2016-08-25 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2369
  
Thanks Robert for addressing my comments :)

Overall, I like the new hybrid producer approach. However, I'm still 
curious whether or not it is possible / reasonable to drop the 
`FlinkKafkaProducer010Configuration` return type of invocation (b), and let 
both invocation methods return `FlinkKafkaProducer010` instead. So,

```
FlinkKafkaProducer010 kafka = new FlinkKafkaProducer010(...)
// or FlinkKafkaProducer010 kafka = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(...) for timestamp support

// setter config methods directly done on the FlinkKafkaProducer010 
instance regardless of (a) or (b)
kafka.setLogFailuresOnly(true)
kafka.setFlushOnCheckpoint(true)
kafka.setWriteTimestampToKafka(true) // would not have effect if original 
invocation method (a) was used
```

But we'll need to be bit hacky in `invokeInternal(element, 
elementTimestamp)`, something like only letting the given `timestamp` to 
`ProducerRecord` be non-null if `writeTimestampToKafka && elementTimestamp != 
Long.MIN_VALUE`.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437224#comment-15437224
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2369
  
Thanks Robert for addressing my comments :)

Overall, I like the new hybrid producer approach. However, I'm still 
curious whether or not it is possible / reasonable to drop the 
`FlinkKafkaProducer010Configuration` return type of invocation (b), and let 
both invocation methods return `FlinkKafkaProducer010` instead. So,

```
FlinkKafkaProducer010 kafka = new FlinkKafkaProducer010(...)
// or FlinkKafkaProducer010 kafka = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(...) for timestamp support

// setter config methods directly done on the FlinkKafkaProducer010 
instance regardless of (a) or (b)
kafka.setLogFailuresOnly(true)
kafka.setFlushOnCheckpoint(true)
kafka.setWriteTimestampToKafka(true) // would not have effect if original 
invocation method (a) was used
```

But we'll need to be bit hacky in `invokeInternal(element, 
elementTimestamp)`, something like only letting the given `timestamp` to 
`ProducerRecord` be non-null if `writeTimestampToKafka && elementTimestamp != 
Long.MIN_VALUE`.

What do you think?


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437223#comment-15437223
 ] 

ASF GitHub Bot commented on FLINK-4437:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2409
  
@tedyu Should we close this pull request?


> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
> Fix For: 1.2.0
>
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2409
  
@tedyu Should we close this pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2420: [hotfix] Fix invalid url link in documentation

2016-08-25 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2420
  
Thanks for the fix @haoch! However, @smarthi opened #2416 to fix the issue 
as well.
Would you mind closing your PR? 

There are certainly more things the documentation that can be improved. 
Please have a look into JIRA or proof-read some pages if you would like to 
contribute. 

Thanks, Fabian



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4498) Better Cassandra sink documentation

2016-08-25 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-4498:
--
Description: 
The Cassandra sink documentation is somewhat muddled and could be improved.  
For instance, the fact that is only supports tuples and POJO's that use 
DataStax Mapper annotations is only mentioned in passing, and it is not clear 
that the reference to tuples only applies to Flink Java tuples and not Scala 
tuples.  

The documentation also does not mention that setQuery() is only necessary for 
tuple streams. 

The explanation of the write ahead log could use some cleaning up to clarify 
when it is appropriate to use, ideally with an example.  Maybe this would be 
best as a blog post to expand on the type of non-deterministic streams this 
applies to.

It would also be useful to mention that tuple elements will be mapped to 
Cassandra columns using the Datastax Java driver's default encoders, which are 
somewhat limited (e.g. to write to a blob column the type in the tuple must be 
a java.nio.ByteBuffer and not just a byte[]).

  was:
The Cassandra sink documentation is somewhat muddled and could be improved.  
For instance, the fact that is only supports tuples and POJO's that use 
DataStax Mapper annotations is only mentioned in passing, and it is not clear 
that the reference to tuples only applies to Flink Java tuples and not Scala 
tuples.  

The documentation also does not mention that setQuery() is only necessary for 
tuple streams.  It would be good to have an example of a POJO stream with the 
DataStax annotations.

The explanation of the write ahead log could use some cleaning up to clarify 
when it is appropriate to use, ideally with an example.  Maybe this would be 
best as a blog post to expand on the type of non-deterministic streams this 
applies to.

It would also be useful to mention that tuple elements will be mapped to 
Cassandra columns using the Datastax Java driver's default encoders, which are 
somewhat limited (e.g. to write to a blob column the type in the tuple must be 
a java.nio.ByteBuffer and not just a byte[]).


> Better Cassandra sink documentation
> ---
>
> Key: FLINK-4498
> URL: https://issues.apache.org/jira/browse/FLINK-4498
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>
> The Cassandra sink documentation is somewhat muddled and could be improved.  
> For instance, the fact that is only supports tuples and POJO's that use 
> DataStax Mapper annotations is only mentioned in passing, and it is not clear 
> that the reference to tuples only applies to Flink Java tuples and not Scala 
> tuples.  
> The documentation also does not mention that setQuery() is only necessary for 
> tuple streams. 
> The explanation of the write ahead log could use some cleaning up to clarify 
> when it is appropriate to use, ideally with an example.  Maybe this would be 
> best as a blog post to expand on the type of non-deterministic streams this 
> applies to.
> It would also be useful to mention that tuple elements will be mapped to 
> Cassandra columns using the Datastax Java driver's default encoders, which 
> are somewhat limited (e.g. to write to a blob column the type in the tuple 
> must be a java.nio.ByteBuffer and not just a byte[]).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4498) Better Cassandra sink documentation

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4498:
-

 Summary: Better Cassandra sink documentation
 Key: FLINK-4498
 URL: https://issues.apache.org/jira/browse/FLINK-4498
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Elias Levy


The Cassandra sink documentation is somewhat muddled and could be improved.  
For instance, the fact that is only supports tuples and POJO's that use 
DataStax Mapper annotations is only mentioned in passing, and it is not clear 
that the reference to tuples only applies to Flink Java tuples and not Scala 
tuples.  

The documentation also does not mention that setQuery() is only necessary for 
tuple streams.  It would be good to have an example of a POJO stream with the 
DataStax annotations.

The explanation of the write ahead log could use some cleaning up to clarify 
when it is appropriate to use, ideally with an example.  Maybe this would be 
best as a blog post to expand on the type of non-deterministic streams this 
applies to.

It would also be useful to mention that tuple elements will be mapped to 
Cassandra columns using the Datastax Java driver's default encoders, which are 
somewhat limited (e.g. to write to a blob column the type in the tuple must be 
a java.nio.ByteBuffer and not just a byte[]).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2418: [FLINK-4245] JMXReporter exposes all defined variables

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2418
  
I think that works to expose the names. I was wondering, though, if we can 
have a more JMX-natural integration.

The JMX `ObjectName` has the constructor `ObjectName(String domain, 
Hashtable table)`. Rather than having 
`key1=taskmanager,key2=,key3=`, we could go for 
`domain=taskmanager.task.operator.io.numRecords` (no variable parsing) and 
`table={taskmanager=, hostname=, jobid=, ...`. If I got the 
thoughts of some of the users right, that would be the preferred way for them.

That probably contradicts the "configurable name" design we have for the 
other reporters, because the table and the formatted scope/name override each 
other (I think, not sure, not a JMX expert here).

In the end, both approaches are probably valid and which one is more usable 
depends on the metric infrastructure of the users. So, if we cannot get both 
into the same reporter, should we simply have two JMX reporters? A "tag-based" 
(like discussed above) and a "name/scope-based" (like we have so far)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4245) Metric naming improvements

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437183#comment-15437183
 ] 

ASF GitHub Bot commented on FLINK-4245:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2418
  
I think that works to expose the names. I was wondering, though, if we can 
have a more JMX-natural integration.

The JMX `ObjectName` has the constructor `ObjectName(String domain, 
Hashtable table)`. Rather than having 
`key1=taskmanager,key2=,key3=`, we could go for 
`domain=taskmanager.task.operator.io.numRecords` (no variable parsing) and 
`table={taskmanager=, hostname=, jobid=, ...`. If I got the 
thoughts of some of the users right, that would be the preferred way for them.

That probably contradicts the "configurable name" design we have for the 
other reporters, because the table and the formatted scope/name override each 
other (I think, not sure, not a JMX expert here).

In the end, both approaches are probably valid and which one is more usable 
depends on the metric infrastructure of the users. So, if we cannot get both 
into the same reporter, should we simply have two JMX reporters? A "tag-based" 
(like discussed above) and a "name/scope-based" (like we have so far)?


> Metric naming improvements
> --
>
> Key: FLINK-4245
> URL: https://issues.apache.org/jira/browse/FLINK-4245
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Stephan Ewen
>
> A metric currently has two parts to it:
>   - The name of that particular metric
>   - The "scope" (or namespace), defined by the group that contains the metric.
> A metric group actually always implicitly has a map of naming "tags", like:
>   - taskmanager_host : 
>   - taskmanager_id : 
>   - task_name : "map() -> filter()"
> We derive the scope from that map, following the defined scope formats.
> For JMX (and some users that use JMX), it would be natural to expose that map 
> of tags. Some users reconstruct that map by parsing the metric scope. JMX, we 
> can expose a metric like:
>   - domain: "taskmanager.task.operator.io"
>   - name: "numRecordsIn"
>   - tags: { "hostname" -> "localhost", "operator_name" -> "map() at 
> X.java:123", ... }
> For many other reporters, the formatted scope makes a lot of sense, since 
> they think only in terms of (scope, metric-name).
> We may even have the formatted scope in JMX as well (in the domain), if we 
> want to go that route. 
> [~jgrier] and [~Zentol] - what do you think about that?
> [~mdaxini] Does that match your use of the metrics?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2420: [hotfix] Fix invalid elasticsearch url link in doc...

2016-08-25 Thread haoch
GitHub user haoch opened a pull request:

https://github.com/apache/flink/pull/2420

[hotfix] Fix invalid elasticsearch url link in documentation

Fix invalid elasticsearch url link from `http://elastic.com` to 
`http://elastic.co` in documentation

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haoch/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2420.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2420


commit 7b4275508940bedc95a1a281530ab7b5848b477e
Author: Hao Chen 
Date:   2016-08-25T16:39:00Z

[hotfix] Fix invalid elasticsearch url link 

Fix invalid elasticsearch url link from `http://elastic.com` to 
`http://elastic.co`




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website

2016-08-25 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437181#comment-15437181
 ] 

Robert Metzger commented on FLINK-4479:
---

PR open: https://github.com/apache/flink-web/pull/32 

> Replace trademark (tm) with registered trademark (R) sign on Flink website
> --
>
> Key: FLINK-4479
> URL: https://issues.apache.org/jira/browse/FLINK-4479
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> Flink is now a registered trademark, so we should reflect that on our website.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

2016-08-25 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2244
  
Looks good to merge @mushketyk :-)
The only thing that I'd change would be to rename 
`Kafka08JsonTableSinkTest` to `Kafka08JsonTableSinkITCase` (same for Kafka09*), 
since these are long running integration tests which should be executed in the 
appropriate Maven phase (verify). 
I can do that as well before merging. 

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437179#comment-15437179
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2244
  
Looks good to merge @mushketyk :-)
The only thing that I'd change would be to rename 
`Kafka08JsonTableSinkTest` to `Kafka08JsonTableSinkITCase` (same for Kafka09*), 
since these are long running integration tests which should be executed in the 
appropriate Maven phase (verify). 
I can do that as well before merging. 

Thanks, Fabian


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...

2016-08-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76277062
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 ---
@@ -135,45 +135,45 @@ public void testPeriodicWatermarks() throws Exception 
{
// elements generate a watermark if the timestamp is a multiple 
of three
 
// elements for partition 1
-   fetcher.emitRecord(1L, part1, 1L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 1L));
-   fetcher.emitRecord(2L, part1, 2L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 2L));
-   fetcher.emitRecord(3L, part1, 3L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 3L));
+   fetcher.emitRecord(1L, part1, 1L, Long.MAX_VALUE);
+   fetcher.emitRecord(2L, part1, 2L, Long.MAX_VALUE);
+   fetcher.emitRecord(3L, part1, 3L, Long.MAX_VALUE);
assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
 
// elements for partition 2
-   fetcher.emitRecord(12L, part2, 1L, new ConsumerRecord(testTopic, 13, new byte[]{0}, 1L));
+   fetcher.emitRecord(12L, part2, 1L, Long.MAX_VALUE);
assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
 
// elements for partition 3
-   fetcher.emitRecord(101L, part3, 1L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 1L));
-   fetcher.emitRecord(102L, part3, 2L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 2L));
+   fetcher.emitRecord(101L, part3, 1L, Long.MAX_VALUE);
+   fetcher.emitRecord(102L, part3, 2L, Long.MAX_VALUE);
assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
 
// now, we should have a watermark (this blocks until the 
periodic thread emitted the watermark)
assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
 
// advance partition 3
-   fetcher.emitRecord(1003L, part3, 3L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 3L));
-   fetcher.emitRecord(1004L, part3, 4L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 4L));
-   fetcher.emitRecord(1005L, part3, 5L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 5L));
+   fetcher.emitRecord(1003L, part3, 3L, Long.MAX_VALUE);
+   fetcher.emitRecord(1004L, part3, 4L, Long.MAX_VALUE);
+   fetcher.emitRecord(1005L, part3, 5L, Long.MAX_VALUE);
assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
 
// advance partition 1 beyond partition 2 - this bumps the 
watermark
-   fetcher.emitRecord(30L, part1, 4L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 4L));
+   fetcher.emitRecord(30L, part1, 4L, Long.MAX_VALUE);
assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());

// this blocks until the periodic thread emitted the watermark
assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
 
// advance partition 2 again - this bumps the watermark
-   fetcher.emitRecord(13L, part2, 2L, new ConsumerRecord(testTopic, 13, new byte[]{0}, 2L));
-   fetcher.emitRecord(14L, part2, 3L, new ConsumerRecord(testTopic, 13, new byte[]{0}, 3L));
-   fetcher.emitRecord(15L, part2, 3L, new ConsumerRecord(testTopic, 13, new byte[]{0}, 3L));
+   fetcher.emitRecord(13L, part2, 2L, Long.MAX_VALUE);
+   fetcher.emitRecord(14L, part2, 3L, Long.MAX_VALUE);
+   fetcher.emitRecord(15L, part2, 3L, Long.MAX_VALUE);
--- End diff --

Same here: I think these are supposed to give a `Long.MIN_VALUE` instead of 
MAX?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437170#comment-15437170
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76277062
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 ---
@@ -135,45 +135,45 @@ public void testPeriodicWatermarks() throws Exception 
{
// elements generate a watermark if the timestamp is a multiple 
of three
 
// elements for partition 1
-   fetcher.emitRecord(1L, part1, 1L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 1L));
-   fetcher.emitRecord(2L, part1, 2L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 2L));
-   fetcher.emitRecord(3L, part1, 3L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 3L));
+   fetcher.emitRecord(1L, part1, 1L, Long.MAX_VALUE);
+   fetcher.emitRecord(2L, part1, 2L, Long.MAX_VALUE);
+   fetcher.emitRecord(3L, part1, 3L, Long.MAX_VALUE);
assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
 
// elements for partition 2
-   fetcher.emitRecord(12L, part2, 1L, new ConsumerRecord(testTopic, 13, new byte[]{0}, 1L));
+   fetcher.emitRecord(12L, part2, 1L, Long.MAX_VALUE);
assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
 
// elements for partition 3
-   fetcher.emitRecord(101L, part3, 1L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 1L));
-   fetcher.emitRecord(102L, part3, 2L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 2L));
+   fetcher.emitRecord(101L, part3, 1L, Long.MAX_VALUE);
+   fetcher.emitRecord(102L, part3, 2L, Long.MAX_VALUE);
assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
 
// now, we should have a watermark (this blocks until the 
periodic thread emitted the watermark)
assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
 
// advance partition 3
-   fetcher.emitRecord(1003L, part3, 3L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 3L));
-   fetcher.emitRecord(1004L, part3, 4L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 4L));
-   fetcher.emitRecord(1005L, part3, 5L, new ConsumerRecord(testTopic, 21, new byte[]{0}, 5L));
+   fetcher.emitRecord(1003L, part3, 3L, Long.MAX_VALUE);
+   fetcher.emitRecord(1004L, part3, 4L, Long.MAX_VALUE);
+   fetcher.emitRecord(1005L, part3, 5L, Long.MAX_VALUE);
assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
 
// advance partition 1 beyond partition 2 - this bumps the 
watermark
-   fetcher.emitRecord(30L, part1, 4L, new ConsumerRecord(testTopic, 7, new byte[]{0}, 4L));
+   fetcher.emitRecord(30L, part1, 4L, Long.MAX_VALUE);
assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());

// this blocks until the periodic thread emitted the watermark
assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
 
// advance partition 2 again - this bumps the watermark
-   fetcher.emitRecord(13L, part2, 2L, new ConsumerRecord(testTopic, 13, new byte[]{0}, 2L));
-   fetcher.emitRecord(14L, part2, 3L, new ConsumerRecord(testTopic, 13, new byte[]{0}, 3L));
-   fetcher.emitRecord(15L, part2, 3L, new ConsumerRecord(testTopic, 13, new byte[]{0}, 3L));
+   fetcher.emitRecord(13L, part2, 2L, Long.MAX_VALUE);
+   fetcher.emitRecord(14L, part2, 3L, Long.MAX_VALUE);
+   fetcher.emitRecord(15L, part2, 3L, Long.MAX_VALUE);
--- End diff --

Same here: I think these are supposed to give a `Long.MIN_VALUE` instead of 
MAX?


> Bump Kafka producer in 

[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437160#comment-15437160
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76275879
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -262,6 +262,10 @@ public void run() {
}
}
 
+   // Kafka09Fetcher ignores the timestamp.
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+   emitRecord(record, partition, offset, Long.MAX_VALUE);
--- End diff --

Same here: I think this is supposed to give a `Long.MIN_VALUE` instead of 
MAX?


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437159#comment-15437159
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76275805
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -376,7 +376,7 @@ else if (partitionsRemoved) {
continue 
partitionsLoop;
}

-   owner.emitRecord(value, 
currentPartition, offset, msg);
+   owner.emitRecord(value, 
currentPartition, offset, Long.MAX_VALUE);
--- End diff --

I think this is supposed to give a `Long.MIN_VALUE` instead of MAX? 


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...

2016-08-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76275879
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -262,6 +262,10 @@ public void run() {
}
}
 
+   // Kafka09Fetcher ignores the timestamp.
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+   emitRecord(record, partition, offset, Long.MAX_VALUE);
--- End diff --

Same here: I think this is supposed to give a `Long.MIN_VALUE` instead of 
MAX?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...

2016-08-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76275805
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -376,7 +376,7 @@ else if (partitionsRemoved) {
continue 
partitionsLoop;
}

-   owner.emitRecord(value, 
currentPartition, offset, msg);
+   owner.emitRecord(value, 
currentPartition, offset, Long.MAX_VALUE);
--- End diff --

I think this is supposed to give a `Long.MIN_VALUE` instead of MAX? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4497:
-

 Summary: Add support for Scala tuples and case classes to 
Cassandra sink
 Key: FLINK-4497
 URL: https://issues.apache.org/jira/browse/FLINK-4497
 Project: Flink
  Issue Type: Improvement
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The new Cassandra sink only supports streams of Flink Java tuples and Java 
POJOs that have been annotated for use by Datastax Mapper.  The sink should be 
extended to support Scala types and case classes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76274362
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+ 

[jira] [Reopened] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reopened FLINK-4437:
---
  Assignee: (was: Ted Yu)

> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
> Fix For: 1.2.0
>
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-4437.
---
Resolution: Fixed

> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
> Fix For: 1.2.0
>
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437151#comment-15437151
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76274826
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -60,12 +60,17 @@ protected void 
assignPartitionsToConsumer(KafkaConsumer consumer
consumer.assign(topicPartitions);
}
 
+   @Override
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+   // pass timestamp
+   super.emitRecord(record, partition, offset, 
consumerRecord.timestamp());
+   }
+
/**
 * Emit record Kafka-timestamp aware.
 */
@Override
-   protected  void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, R 
kafkaRecord) throws Exception {
-   long timestamp = ((ConsumerRecord) kafkaRecord).timestamp();
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, long 
timestamp) throws Exception {
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
--- End diff --

Is it possible to let `AbstractFetcher#emitRecord` determine whether to 
call `collectWithTimestamp` or `collect` depending on the provided timestamp 
(== `Long.MIN_VALUE`)? Then, we won't need to override the base `emitRecord` 
here, correct?


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...

2016-08-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76274826
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -60,12 +60,17 @@ protected void 
assignPartitionsToConsumer(KafkaConsumer consumer
consumer.assign(topicPartitions);
}
 
+   @Override
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+   // pass timestamp
+   super.emitRecord(record, partition, offset, 
consumerRecord.timestamp());
+   }
+
/**
 * Emit record Kafka-timestamp aware.
 */
@Override
-   protected  void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, R 
kafkaRecord) throws Exception {
-   long timestamp = ((ConsumerRecord) kafkaRecord).timestamp();
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, long 
timestamp) throws Exception {
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
--- End diff --

Is it possible to let `AbstractFetcher#emitRecord` determine whether to 
call `collectWithTimestamp` or `collect` depending on the provided timestamp 
(== `Long.MIN_VALUE`)? Then, we won't need to override the base `emitRecord` 
here, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437148#comment-15437148
 ] 

ASF GitHub Bot commented on FLINK-4363:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76274362
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   

[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat

2016-08-25 Thread Alexander Chermenin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437141#comment-15437141
 ] 

Alexander Chermenin commented on FLINK-2608:


This is a bug in ArraysAsListSerializer from Twitter Chill 
(https://github.com/twitter/chill/issues/255).
It will be fixed after building next release and changing dependency version 
for this one.

> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> 

[jira] [Commented] (FLINK-4480) Incorrect link to elastic.co in documentation

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437136#comment-15437136
 ] 

ASF GitHub Bot commented on FLINK-4480:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2416
  
Merging this...


> Incorrect link to elastic.co in documentation
> -
>
> Key: FLINK-4480
> URL: https://issues.apache.org/jira/browse/FLINK-4480
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0, 1.1.1
>Reporter: Fabian Hueske
>Assignee: Suneel Marthi
>Priority: Trivial
>
> The link URL of the entry "Elasticsearch 2x (sink)" on the connector's 
> documentation page 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html
>  is pointing to http://elastic.com but should point to http://elastic.co



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2416: FLINK-4480: Incorrect link to elastic.co in documentation

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2416
  
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4462) Add RUN_TIME retention policy for all the flink annotations

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437134#comment-15437134
 ] 

ASF GitHub Bot commented on FLINK-4462:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2412
  
The `japicmp` is a maven plugin that is added to projects like 
`flink-core`. It is not part of any code.


> Add RUN_TIME retention policy for all the flink annotations
> ---
>
> Key: FLINK-4462
> URL: https://issues.apache.org/jira/browse/FLINK-4462
> Project: Flink
>  Issue Type: Improvement
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>
> It is better to add RUNTIME retention policy to flink annotations. So that 
> utilites/tests can be added to ensure if the classes/interfaces are all 
> tagged with proper annotations. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2412: FLINK-4462 Add RUN_TIME retention policy for all the flin...

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2412
  
The `japicmp` is a maven plugin that is added to projects like 
`flink-core`. It is not part of any code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437131#comment-15437131
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2315
  
There are some compilation issues in the build logs:

```
[ERROR] COMPILATION ERROR : 
[INFO] -
[ERROR] 
/home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[21,33]
 package com.google.common.collect does not exist
[ERROR] 
/home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[65,24]
 cannot find symbol
```

Could you rebase to the latest master and also remove any merge commits? 
Thanks!




> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1)

2016-08-25 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2315
  
There are some compilation issues in the build logs:

```
[ERROR] COMPILATION ERROR : 
[INFO] -
[ERROR] 
/home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[21,33]
 package com.google.common.collect does not exist
[ERROR] 
/home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[65,24]
 cannot find symbol
```

Could you rebase to the latest master and also remove any merge commits? 
Thanks!




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4433) Refactor the StreamSource.

2016-08-25 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4433:
--
Issue Type: Improvement  (was: Bug)

> Refactor the StreamSource.
> --
>
> Key: FLINK-4433
> URL: https://issues.apache.org/jira/browse/FLINK-4433
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> With the addition of continuous file monitoring, apart from the 
> {{StreamSource}} also the {{ContinuousFileReaderOperator}} uses a 
> {{SourceContext}}. Given this, all the implementations of the 
> {{SourceContext}} should be removed from the {{StreamSource}} and become 
> independent classes.
> In addition, the {{AsyncExceptionChecker}} interface should be removed as its 
> functionality can be replaced by the {{task.failExternally()}} method. This 
> also implies slight changes in the source context implementations. 
> Also in the {{trigger()}} method of the {{WatermarkEmittingTask}}, all the 
> {{owner.getCurrentProcessingTime()}} could be replaced by the {{timestamp}} 
> argument of that method. This will remove some of the calls to the 
> {{getCurrentProcessingTime()}} which can be expensive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4433) Refactor the StreamSource.

2016-08-25 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4433:
--
Component/s: DataStream API

> Refactor the StreamSource.
> --
>
> Key: FLINK-4433
> URL: https://issues.apache.org/jira/browse/FLINK-4433
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> With the addition of continuous file monitoring, apart from the 
> {{StreamSource}} also the {{ContinuousFileReaderOperator}} uses a 
> {{SourceContext}}. Given this, all the implementations of the 
> {{SourceContext}} should be removed from the {{StreamSource}} and become 
> independent classes.
> In addition, the {{AsyncExceptionChecker}} interface should be removed as its 
> functionality can be replaced by the {{task.failExternally()}} method. This 
> also implies slight changes in the source context implementations. 
> Also in the {{trigger()}} method of the {{WatermarkEmittingTask}}, all the 
> {{owner.getCurrentProcessingTime()}} could be replaced by the {{timestamp}} 
> argument of that method. This will remove some of the calls to the 
> {{getCurrentProcessingTime()}} which can be expensive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4433) Refactor the StreamSource.

2016-08-25 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-4433:
-

Assignee: Kostas Kloudas

> Refactor the StreamSource.
> --
>
> Key: FLINK-4433
> URL: https://issues.apache.org/jira/browse/FLINK-4433
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> With the addition of continuous file monitoring, apart from the 
> {{StreamSource}} also the {{ContinuousFileReaderOperator}} uses a 
> {{SourceContext}}. Given this, all the implementations of the 
> {{SourceContext}} should be removed from the {{StreamSource}} and become 
> independent classes.
> In addition, the {{AsyncExceptionChecker}} interface should be removed as its 
> functionality can be replaced by the {{task.failExternally()}} method. This 
> also implies slight changes in the source context implementations. 
> Also in the {{trigger()}} method of the {{WatermarkEmittingTask}}, all the 
> {{owner.getCurrentProcessingTime()}} could be replaced by the {{timestamp}} 
> argument of that method. This will remove some of the calls to the 
> {{getCurrentProcessingTime()}} which can be expensive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #:

2016-08-25 Thread rmetzger
Github user rmetzger commented on the pull request:


https://github.com/apache/flink/commit/444315a12ca2b1d3de44ea50dda9b8bb5a36bb9e#commitcomment-18778263
  
Mh, I stumbled across the broken log upload when I tried fixing that issue 
... so I thought the commit is related to my efforts fixing the issue ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3761) Refactor State Backends/Make Keyed State Key-Group Aware

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437116#comment-15437116
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1988
  
I think this is subsumed by #2376 

Should we close this Pull Request?


> Refactor State Backends/Make Keyed State Key-Group Aware
> 
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1988: [FLINK-3761] Introduction of key groups

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/1988
  
I think this is subsumed by #2376 

Should we close this Pull Request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4488) Prevent cluster shutdown after job execution for non-detached jobs

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437113#comment-15437113
 ] 

ASF GitHub Bot commented on FLINK-4488:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2419
  
That makes sense, +1 to merge this for 1.1.2 and 1.2

Wondering if we can guard this via a test, so that the FLIP-6 refactoring 
does not re-introduce the bug.


> Prevent cluster shutdown after job execution for non-detached jobs
> --
>
> Key: FLINK-4488
> URL: https://issues.apache.org/jira/browse/FLINK-4488
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Affects Versions: 1.2.0, 1.1.1
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0, 1.1.2
>
>
> In per-job mode, the Yarn cluster currently shuts down after the first 
> interactively executed job. Users may want to execute multiple jobs in one 
> Jar. I would suggest to use this mechanism only for jobs which run detached. 
> For interactive jobs, shutdown of the cluster is additionally handled by the 
> CLI which should be sufficient to ensure cluster shutdown. Cluster shutdown 
> could only become a problem in case of a network partition to the cluster or 
> outage of the CLI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2419: [FLINK-4488] only automatically shutdown clusters for det...

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2419
  
That makes sense, +1 to merge this for 1.1.2 and 1.2

Wondering if we can guard this via a test, so that the FLIP-6 refactoring 
does not re-introduce the bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437100#comment-15437100
 ] 

ASF GitHub Bot commented on FLINK-4418:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2383
  
Merging this...


> ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if 
> InetAddress.getLocalHost throws exception
> --
>
> Key: FLINK-4418
> URL: https://issues.apache.org/jira/browse/FLINK-4418
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Shannon Carey
>Assignee: Shannon Carey
>
> When attempting to connect to a cluster with a ClusterClient, if the 
> machine's hostname is not resolvable to an IP, an exception is thrown 
> preventing success.
> This is the case if, for example, the hostname is not present & mapped to a 
> local IP in /etc/hosts.
> The exception is below. I suggest that findAddressUsingStrategy() should 
> catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and 
> return null, allowing alternative strategies to be attempted by 
> findConnectingAddress(). I will open a PR to this effect. Ideally this could 
> be included in both 1.2 and 1.1.2.
> In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS 
> EC2 instance.
> {code}
> 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed 
> to retrieve the JobManager gateway.
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430)
> 21:11:35  at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
> 21:11:35  at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34)
> 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager 
> address at /10.2.89.80:43126
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428)
> 21:11:35  ... 8 more
> 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: 
> ip-10-2-64-47: unknown error
> 21:11:35  at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
> 21:11:35  at 
> org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232)
> 21:11:35  at 
> org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187)
> 21:11:35  ... 10 more
> 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown 
> error
> 21:11:35  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
> 21:11:35  at 
> java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
> 21:11:35  at 
> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
> 21:11:35  at java.net.InetAddress.getLocalHost(InetAddress.java:1500)
> 21:11:35  ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-4437.
---
Resolution: Fixed

> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Ted Yu
> Fix For: 1.2.0
>
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reopened FLINK-4437:
---
  Assignee: Ted Yu

> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Ted Yu
> Fix For: 1.2.0
>
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...

2016-08-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76268687
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 ---
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
+ *  the Kafka 0.10 producer has a section invocation option, approach (b).
+ *
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to 
Kafka. When adding the
+ *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() 
method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to 
Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach 
they are needed for.
+ */
+public class FlinkKafkaProducer010 extends StreamSink implements 
SinkFunction, RichFunction {
+
+   /**
+* Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
+*/
+   private boolean writeTimestampToKafka = false;
+
+   // -- "Constructors" for timestamp writing 
--
+
+   /**
+* Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+* the topic.
+*
+* This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+*
+* @param inStream The stream to write to Kafka
+* @param topicId ID of the Kafka topic.
+* @param serializationSchema User defined serialization schema 
supporting key/value messages
+* @param producerConfig Properties with the producer configuration.
+*/
+   public static  FlinkKafkaProducer010Configuration 
writeToKafka(DataStream inStream,
+  

[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437096#comment-15437096
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76268687
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 ---
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
+ *  the Kafka 0.10 producer has a section invocation option, approach (b).
+ *
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to 
Kafka. When adding the
+ *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() 
method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to 
Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach 
they are needed for.
+ */
+public class FlinkKafkaProducer010 extends StreamSink implements 
SinkFunction, RichFunction {
+
+   /**
+* Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
+*/
+   private boolean writeTimestampToKafka = false;
+
+   // -- "Constructors" for timestamp writing 
--
+
+   /**
+* Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+* the topic.
+*
+* This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+*
+* @param inStream The stream to write to Kafka
+* @param topicId ID of the Kafka topic.
+* @param serializationSchema User defined serialization schema 
supporting 

[GitHub] flink issue #2383: [FLINK-4418] [client] Improve resilience when InetAddress...

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2383
  
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2016-08-25 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/444315a12ca2b1d3de44ea50dda9b8bb5a36bb9e#commitcomment-18778229
  
Does this have the wrong issue tag?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4495) Running multiple jobs on yarn (without yarn-session)

2016-08-25 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437088#comment-15437088
 ] 

Maximilian Michels commented on FLINK-4495:
---

Thanks [~nielsbasjes] for opening the jira issue. I also created one 
(FLINK-4488) with an easy fix.

> Running multiple jobs on yarn (without yarn-session)
> 
>
> Key: FLINK-4495
> URL: https://issues.apache.org/jira/browse/FLINK-4495
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN Client
>Reporter: Niels Basjes
>
> I created a small application that needs to run multiple (batch) jobs on Yarn 
> and then terminate.
> I essentially do right now the following:
> flink run -m yarn-cluster -yn 10  bla.jar ...
> And in my main I do
> foreach thing I need to do {
>ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>env. ... define the batch job.
>env.execute
> }
> In the second job I submit I get an exception:
> {code}
> java.lang.RuntimeException: Unable to tell application master to stop once 
> the specified job has been finised
>   at 
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184)
>   at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
>   at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
>   at com.bol.tools.hbase.export.Main.run(Main.java:81)
>   at com.bol.tools.hbase.export.Main.main(Main.java:42)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>   at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
>   at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
>   at 
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at 
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
> [1 milliseconds]
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>   at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>   at scala.concurrent.Await$.result(package.scala:107)
>   at scala.concurrent.Await.result(package.scala)
>   at 
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182)
>   ... 25 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437091#comment-15437091
 ] 

ASF GitHub Bot commented on FLINK-4418:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2383
  
I would merge this fix by itself first.
Then, if you have a nice way in mind to make this simpler, I'd be happy to 
look at another pull request ;-)


> ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if 
> InetAddress.getLocalHost throws exception
> --
>
> Key: FLINK-4418
> URL: https://issues.apache.org/jira/browse/FLINK-4418
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Shannon Carey
>Assignee: Shannon Carey
>
> When attempting to connect to a cluster with a ClusterClient, if the 
> machine's hostname is not resolvable to an IP, an exception is thrown 
> preventing success.
> This is the case if, for example, the hostname is not present & mapped to a 
> local IP in /etc/hosts.
> The exception is below. I suggest that findAddressUsingStrategy() should 
> catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and 
> return null, allowing alternative strategies to be attempted by 
> findConnectingAddress(). I will open a PR to this effect. Ideally this could 
> be included in both 1.2 and 1.1.2.
> In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS 
> EC2 instance.
> {code}
> 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed 
> to retrieve the JobManager gateway.
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430)
> 21:11:35  at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
> 21:11:35  at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34)
> 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager 
> address at /10.2.89.80:43126
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428)
> 21:11:35  ... 8 more
> 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: 
> ip-10-2-64-47: unknown error
> 21:11:35  at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
> 21:11:35  at 
> org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232)
> 21:11:35  at 
> org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187)
> 21:11:35  ... 10 more
> 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown 
> error
> 21:11:35  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
> 21:11:35  at 
> java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
> 21:11:35  at 
> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
> 21:11:35  at java.net.InetAddress.getLocalHost(InetAddress.java:1500)
> 21:11:35  ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437092#comment-15437092
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76268268
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 ---
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
+ *  the Kafka 0.10 producer has a section invocation option, approach (b).
+ *
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to 
Kafka. When adding the
+ *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() 
method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to 
Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach 
they are needed for.
+ */
+public class FlinkKafkaProducer010 extends StreamSink implements 
SinkFunction, RichFunction {
+
+   /**
+* Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
+*/
+   private boolean writeTimestampToKafka = false;
+
+   // -- "Constructors" for timestamp writing 
--
+
+   /**
+* Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+* the topic.
+*
+* This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+*
+* @param inStream The stream to write to Kafka
+* @param topicId ID of the Kafka topic.
+* @param serializationSchema User defined serialization schema 
supporting 

[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...

2016-08-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r76268268
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 ---
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
+ *  the Kafka 0.10 producer has a section invocation option, approach (b).
+ *
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to 
Kafka. When adding the
+ *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() 
method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to 
Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach 
they are needed for.
+ */
+public class FlinkKafkaProducer010 extends StreamSink implements 
SinkFunction, RichFunction {
+
+   /**
+* Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
+*/
+   private boolean writeTimestampToKafka = false;
+
+   // -- "Constructors" for timestamp writing 
--
+
+   /**
+* Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+* the topic.
+*
+* This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+*
+* @param inStream The stream to write to Kafka
+* @param topicId ID of the Kafka topic.
+* @param serializationSchema User defined serialization schema 
supporting key/value messages
+* @param producerConfig Properties with the producer configuration.
+*/
+   public static  FlinkKafkaProducer010Configuration 
writeToKafka(DataStream inStream,
+  

  1   2   3   >