[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

2015-09-01 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1069#issuecomment-136617574
  
@anisnasir, actually I didn't thought about automatic adaption. First I 
thought about making it configurable. But if you could do it also 
automatically, then it would even be better. But I guess that it's fine to 
start with the configurable solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1725) New Partitioner for better load balancing for skewed data

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14724917#comment-14724917
 ] 

ASF GitHub Bot commented on FLINK-1725:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1069#issuecomment-136617574
  
@anisnasir, actually I didn't thought about automatic adaption. First I 
thought about making it configurable. But if you could do it also 
automatically, then it would even be better. But I guess that it's fine to 
start with the configurable solution.


> New Partitioner for better load balancing for skewed data
> -
>
> Key: FLINK-1725
> URL: https://issues.apache.org/jira/browse/FLINK-1725
> Project: Flink
>  Issue Type: Improvement
>  Components: New Components
>Affects Versions: 0.8.1
>Reporter: Anis Nasir
>Assignee: Anis Nasir
>  Labels: LoadBalancing, Partitioner
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Hi,
> We have recently studied the problem of load balancing in Storm [1].
> In particular, we focused on key distribution of the stream for skewed data.
> We developed a new stream partitioning scheme (which we call Partial Key 
> Grouping). It achieves better load balancing than key grouping while being 
> more scalable than shuffle grouping in terms of memory.
> In the paper we show a number of mining algorithms that are easy to implement 
> with partial key grouping, and whose performance can benefit from it. We 
> think that it might also be useful for a larger class of algorithms.
> Partial key grouping is very easy to implement: it requires just a few lines 
> of code in Java when implemented as a custom grouping in Storm [2].
> For all these reasons, we believe it will be a nice addition to the standard 
> Partitioners available in Flink. If the community thinks it's a good idea, we 
> will be happy to offer support in the porting.
> References:
> [1]. 
> https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
> [2]. https://github.com/gdfm/partial-key-grouping



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

2015-09-01 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136619441
  
@s1ck, it's important to note that `1` will be subtracted from 
`getRuntimeContext().getNumberOfParallelSubtasks()` and not `getBitSize()`. The 
reason is that we have `0` based indices for the subtasks. Thus, we only have 
to calculate the maximum needed bits for the highest index we can encounter. 
And this is `getRuntimeContext().getNumberOfParallelSubtasks() - 1`. Thus if 
`getNumberOfParallelSubtasks == 7`, then we would calculate `getBitSize(6) == 
3`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14724924#comment-14724924
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136619441
  
@s1ck, it's important to note that `1` will be subtracted from 
`getRuntimeContext().getNumberOfParallelSubtasks()` and not `getBitSize()`. The 
reason is that we have `0` based indices for the subtasks. Thus, we only have 
to calculate the maximum needed bits for the highest index we can encounter. 
And this is `getRuntimeContext().getNumberOfParallelSubtasks() - 1`. Thus if 
`getNumberOfParallelSubtasks == 7`, then we would calculate `getBitSize(6) == 
3`.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

2015-09-01 Thread s1ck
Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136633442
  
@tillrohrmann of course you are right, I thought wrong about it. it's 
committed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14724961#comment-14724961
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136633442
  
@tillrohrmann of course you are right, I thought wrong about it. it's 
committed


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14724980#comment-14724980
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136636200
  
@mjsax @StephanEwen I have finish the code change.Can you give me some 
comment? Thank you very much!


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

2015-09-01 Thread ffbin
Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-136636200
  
@mjsax @StephanEwen I have finish the code change.Can you give me some 
comment? Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2600) Failing ElasticsearchSinkITCase.testNodeClient test case

2015-09-01 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-2600:


 Summary: Failing ElasticsearchSinkITCase.testNodeClient test case
 Key: FLINK-2600
 URL: https://issues.apache.org/jira/browse/FLINK-2600
 Project: Flink
  Issue Type: Bug
Reporter: Till Rohrmann


I observed that the {{ElasticsearchSinkITCase.testNodeClient}} test case fails 
on Travis. The stack trace is

{code}
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:414)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManager.scala:285)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: An error occured in ElasticsearchSink.
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.close(ElasticsearchSink.java:307)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:185)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: IndexMissingException[[my-index] missing]
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink$1.afterBulk(ElasticsearchSink.java:240)
at 
org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:316)
at 
org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:299)
at 
org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:281)
at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264)
at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:260)
at 
org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:246)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.invoke(ElasticsearchSink.java:286)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:163)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
{code}

Resources:

[1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/78055773/log.txt



--
This message was

[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

2015-09-01 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136640020
  
@s1ck, looks really good. Thanks for your contribution. Will merge it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725007#comment-14725007
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136640020
  
@s1ck, looks really good. Thanks for your contribution. Will merge it now.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2600) Failing ElasticsearchSinkITCase.testNodeClient test case

2015-09-01 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725018#comment-14725018
 ] 

Aljoscha Krettek commented on FLINK-2600:
-

This might not be reproducible, the Elasicsearch sink has been in there for a 
few weeks now and we never saw this. Nevertheless, I will look into how we can 
harden this.

> Failing ElasticsearchSinkITCase.testNodeClient test case
> 
>
> Key: FLINK-2600
> URL: https://issues.apache.org/jira/browse/FLINK-2600
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>  Labels: test-stability
>
> I observed that the {{ElasticsearchSinkITCase.testNodeClient}} test case 
> fails on Travis. The stack trace is
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:414)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManager.scala:285)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: An error occured in ElasticsearchSink.
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.close(ElasticsearchSink.java:307)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:185)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: IndexMissingException[[my-index] 
> missing]
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink$1.afterBulk(ElasticsearchSink.java:240)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:316)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:299)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:281)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:260)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:246)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.invoke(ElasticsearchSink.java:286)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37)
>   at 
> org.apache.fli

[jira] [Assigned] (FLINK-2600) Failing ElasticsearchSinkITCase.testNodeClient test case

2015-09-01 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-2600:
---

Assignee: Aljoscha Krettek

> Failing ElasticsearchSinkITCase.testNodeClient test case
> 
>
> Key: FLINK-2600
> URL: https://issues.apache.org/jira/browse/FLINK-2600
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>  Labels: test-stability
>
> I observed that the {{ElasticsearchSinkITCase.testNodeClient}} test case 
> fails on Travis. The stack trace is
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:414)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.testingUtils.TestingJobManager$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManager.scala:285)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: An error occured in ElasticsearchSink.
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.close(ElasticsearchSink.java:307)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:185)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: IndexMissingException[[my-index] 
> missing]
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink$1.afterBulk(ElasticsearchSink.java:240)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:316)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:299)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:281)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:264)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:260)
>   at 
> org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:246)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink.invoke(ElasticsearchSink.java:286)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:163)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInput

[jira] [Created] (FLINK-2601) IOManagerAsync may produce NPE during shutdown

2015-09-01 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-2601:
-

 Summary: IOManagerAsync may produce NPE during shutdown
 Key: FLINK-2601
 URL: https://issues.apache.org/jira/browse/FLINK-2601
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 0.10
Reporter: Robert Metzger
Priority: Minor


While analyzing a failed YARN test, I detected that it failed because it found 
the following exception in the logs:

taskmanager-stderr:
{code}
Exception in thread "I/O manager shutdown hook" java.lang.NullPointerException
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:144)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
{code}

taskmanager.log
{code}
18:45:00,812 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Starting TaskManager actor
18:45:00,819 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig
 - NettyConfig [server address: 
testing-worker-linux-docker-56ee9bbf-3203-linux-2.prod.travis-ci.org/172.17.9.129,
 server port: 38689, memory segment size (bytes): 32768, transport type: NIO, 
number of server threads: 0 (use Netty's default), number of client threads: 0 
(use Netty's default), server connect backlog: 0 (use Netty's default), client 
connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's 
default)]
18:45:00,822 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Messages between TaskManager and JobManager have a max timeout of 10 
milliseconds
18:45:00,825 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Temporary file directory 
'/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007':
 total 15 GB, usable 7 GB (46.67% usable)
18:45:00,929 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool 
 - Allocated 64 MB for network buffer pool (number of memory segments: 2048, 
bytes per segment: 32768).
18:45:01,186 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Using 0.7 of the currently free heap space for Flink managed memory (236 MB).
18:45:01,755 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager 
 - I/O manager uses directory 
/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
 for spill files.
18:45:01,831 ERROR org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
 - RECEIVED SIGNAL 15: SIGTERM
18:45:01,833 ERROR org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
 - Error while shutting down IO Manager reader thread.
java.lang.NullPointerException
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:133)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
18:45:01,841 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager 
 - I/O manager removed spill file directory 
/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
{code}

Looks like the TM is shutting down while still starting up. Hardening this 
should be easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

2015-09-01 Thread s1ck
Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136654131
  
Sorry, I did not see that there are also identical test cases in Scala 
which now fail due to the `-1` change. As those scala methods wrap the Java 
methods, is it necessary to run the same tests on them again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2601) IOManagerAsync may produce NPE during shutdown

2015-09-01 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725087#comment-14725087
 ] 

Ufuk Celebi commented on FLINK-2601:


Nice that you've spotted this. :-) Should be easy to fix. Let's make sure to 
check other components as well (like blog manager, network environment, etc)

> IOManagerAsync may produce NPE during shutdown
> --
>
> Key: FLINK-2601
> URL: https://issues.apache.org/jira/browse/FLINK-2601
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: test-stability
>
> While analyzing a failed YARN test, I detected that it failed because it 
> found the following exception in the logs:
> taskmanager-stderr:
> {code}
> Exception in thread "I/O manager shutdown hook" java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:144)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
> {code}
> taskmanager.log
> {code}
> 18:45:00,812 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Starting TaskManager actor
> 18:45:00,819 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig  
>- NettyConfig [server address: 
> testing-worker-linux-docker-56ee9bbf-3203-linux-2.prod.travis-ci.org/172.17.9.129,
>  server port: 38689, memory segment size (bytes): 32768, transport type: NIO, 
> number of server threads: 0 (use Netty's default), number of client threads: 
> 0 (use Netty's default), server connect backlog: 0 (use Netty's default), 
> client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use 
> Netty's default)]
> 18:45:00,822 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Messages between TaskManager and JobManager have a max timeout of 10 
> milliseconds
> 18:45:00,825 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Temporary file directory 
> '/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007':
>  total 15 GB, usable 7 GB (46.67% usable)
> 18:45:00,929 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 
> MB for network buffer pool (number of memory segments: 2048, bytes per 
> segment: 32768).
> 18:45:01,186 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Using 0.7 of the currently free heap space for Flink managed memory (236 
> MB).
> 18:45:01,755 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager   
>- I/O manager uses directory 
> /home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
>  for spill files.
> 18:45:01,831 ERROR org.apache.flink.yarn.appMaster.YarnTaskManagerRunner  
>- RECEIVED SIGNAL 15: SIGTERM
> 18:45:01,833 ERROR org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync  
>- Error while shutting down IO Manager reader thread.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:133)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
> 18:45:01,841 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager   
>- I/O manager removed spill file directory 
> /home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
> {code}
> Looks like the TM is shutting down while still starting up. Hardening this 
> should be easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725086#comment-14725086
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136654131
  
Sorry, I did not see that there are also identical test cases in Scala 
which now fail due to the `-1` change. As those scala methods wrap the Java 
methods, is it necessary to run the same tests on them again?


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725097#comment-14725097
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136655355
  
No problem @s1ck. It might be a bit redundant but it tests that the 
forwarding is done correctly. Therefore, I fixed the test case.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

2015-09-01 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136655355
  
No problem @s1ck. It might be a bit redundant but it tests that the 
forwarding is done correctly. Therefore, I fixed the test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2601) IOManagerAsync may produce NPE during shutdown

2015-09-01 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-2601:
--
Description: 
While analyzing a failed YARN test, I detected that it failed because it found 
the following exception in the logs:

taskmanager-stderr:
{code}
Exception in thread "I/O manager shutdown hook" java.lang.NullPointerException
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:144)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
{code}

taskmanager.log
{code}
18:45:00,812 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Starting TaskManager actor
18:45:00,819 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig
 - NettyConfig [server address: 
testing-worker-linux-docker-56ee9bbf-3203-linux-2.prod.travis-ci.org/172.17.9.129,
 server port: 38689, memory segment size (bytes): 32768, transport type: NIO, 
number of server threads: 0 (use Netty's default), number of client threads: 0 
(use Netty's default), server connect backlog: 0 (use Netty's default), client 
connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's 
default)]
18:45:00,822 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Messages between TaskManager and JobManager have a max timeout of 10 
milliseconds
18:45:00,825 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Temporary file directory 
'/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007':
 total 15 GB, usable 7 GB (46.67% usable)
18:45:00,929 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool 
 - Allocated 64 MB for network buffer pool (number of memory segments: 2048, 
bytes per segment: 32768).
18:45:01,186 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Using 0.7 of the currently free heap space for Flink managed memory (236 MB).
18:45:01,755 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager 
 - I/O manager uses directory 
/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
 for spill files.
18:45:01,831 ERROR org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
 - RECEIVED SIGNAL 15: SIGTERM
18:45:01,833 ERROR org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
 - Error while shutting down IO Manager reader thread.
java.lang.NullPointerException
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:133)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
18:45:01,841 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager 
 - I/O manager removed spill file directory 
/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
{code}

Looks like the TM is shutting down while still starting up. Hardening this 
should be easy.

https://s3.amazonaws.com/archive.travis-ci.org/jobs/78052378/log.txt

  was:
While analyzing a failed YARN test, I detected that it failed because it found 
the following exception in the logs:

taskmanager-stderr:
{code}
Exception in thread "I/O manager shutdown hook" java.lang.NullPointerException
at 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:144)
at 
org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
{code}

taskmanager.log
{code}
18:45:00,812 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Starting TaskManager actor
18:45:00,819 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig
 - NettyConfig [server address: 
testing-worker-linux-docker-56ee9bbf-3203-linux-2.prod.travis-ci.org/172.17.9.129,
 server port: 38689, memory segment size (bytes): 32768, transport type: NIO, 
number of server threads: 0 (use Netty's default), number of client threads: 0 
(use Netty's default), server connect backlog: 0 (use Netty's default), client 
connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's 
default)]
18:45:00,822 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Messages between TaskManager and JobManager have a max timeout of 10 
milliseconds
18:45:00,825 INFO  org.apache.flink.runtime.taskmanager.TaskManager 
 - Temporary file directory 
'/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/trav

[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725109#comment-14725109
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136658385
  
Ok, thank you.


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

2015-09-01 Thread s1ck
Github user s1ck commented on the pull request:

https://github.com/apache/flink/pull/1075#issuecomment-136658385
  
Ok, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-2596) Failing Test: RandomSamplerTest

2015-09-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2596.
-
   Resolution: Fixed
Fix Version/s: 0.10

Fixed via f4a48c23a30c170a5a2c08c27e1f01f7827eefd2

Thank you for the contribution!

> Failing Test: RandomSamplerTest
> ---
>
> Key: FLINK-2596
> URL: https://issues.apache.org/jira/browse/FLINK-2596
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Assignee: Chengxiang Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 0.10
>
>
> {noformat}
> Tests run: 17, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 14.925 sec 
> <<< FAILURE! - in org.apache.flink.api.java.sampling.RandomSamplerTest
> testReservoirSamplerWithMultiSourcePartitions2(org.apache.flink.api.java.sampling.RandomSamplerTest)
>  Time elapsed: 0.444 sec <<< ERROR!
> java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!
> at java.util.TimSort.mergeLo(TimSort.java:747)
> at java.util.TimSort.mergeAt(TimSort.java:483)
> at java.util.TimSort.mergeCollapse(TimSort.java:410)
> at java.util.TimSort.sort(TimSort.java:214)
> at java.util.TimSort.sort(TimSort.java:173)
> at java.util.Arrays.sort(Arrays.java:659)
> at java.util.Collections.sort(Collections.java:217)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.transferFromListToArrayWithOrder(RandomSamplerTest.java:375)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.getSampledOutput(RandomSamplerTest.java:367)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyKSTest(RandomSamplerTest.java:338)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyRandomSamplerWithSampleSize(RandomSamplerTest.java:330)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyReservoirSamplerWithReplacement(RandomSamplerTest.java:290)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2(RandomSamplerTest.java:212)
> Results :
> Tests in error:
> RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2:212->verifyReservoirSamplerWithReplacement:290->verifyRandomSamplerWithSampleSize:330->verifyKSTest:338->getSampledOutput:367->transferFromListToArrayWithOrder:375
>  » IllegalArgument
> {noformat}
> https://travis-ci.org/apache/flink/jobs/77750329



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2596) Failing Test: RandomSamplerTest

2015-09-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2596.
---

> Failing Test: RandomSamplerTest
> ---
>
> Key: FLINK-2596
> URL: https://issues.apache.org/jira/browse/FLINK-2596
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Assignee: Chengxiang Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 0.10
>
>
> {noformat}
> Tests run: 17, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 14.925 sec 
> <<< FAILURE! - in org.apache.flink.api.java.sampling.RandomSamplerTest
> testReservoirSamplerWithMultiSourcePartitions2(org.apache.flink.api.java.sampling.RandomSamplerTest)
>  Time elapsed: 0.444 sec <<< ERROR!
> java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!
> at java.util.TimSort.mergeLo(TimSort.java:747)
> at java.util.TimSort.mergeAt(TimSort.java:483)
> at java.util.TimSort.mergeCollapse(TimSort.java:410)
> at java.util.TimSort.sort(TimSort.java:214)
> at java.util.TimSort.sort(TimSort.java:173)
> at java.util.Arrays.sort(Arrays.java:659)
> at java.util.Collections.sort(Collections.java:217)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.transferFromListToArrayWithOrder(RandomSamplerTest.java:375)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.getSampledOutput(RandomSamplerTest.java:367)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyKSTest(RandomSamplerTest.java:338)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyRandomSamplerWithSampleSize(RandomSamplerTest.java:330)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyReservoirSamplerWithReplacement(RandomSamplerTest.java:290)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2(RandomSamplerTest.java:212)
> Results :
> Tests in error:
> RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2:212->verifyReservoirSamplerWithReplacement:290->verifyRandomSamplerWithSampleSize:330->verifyKSTest:338->getSampledOutput:367->transferFromListToArrayWithOrder:375
>  » IllegalArgument
> {noformat}
> https://travis-ci.org/apache/flink/jobs/77750329



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2601) IOManagerAsync may produce NPE during shutdown

2015-09-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reassigned FLINK-2601:
---

Assignee: Stephan Ewen

> IOManagerAsync may produce NPE during shutdown
> --
>
> Key: FLINK-2601
> URL: https://issues.apache.org/jira/browse/FLINK-2601
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Minor
>  Labels: test-stability
>
> While analyzing a failed YARN test, I detected that it failed because it 
> found the following exception in the logs:
> taskmanager-stderr:
> {code}
> Exception in thread "I/O manager shutdown hook" java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:144)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
> {code}
> taskmanager.log
> {code}
> 18:45:00,812 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Starting TaskManager actor
> 18:45:00,819 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig  
>- NettyConfig [server address: 
> testing-worker-linux-docker-56ee9bbf-3203-linux-2.prod.travis-ci.org/172.17.9.129,
>  server port: 38689, memory segment size (bytes): 32768, transport type: NIO, 
> number of server threads: 0 (use Netty's default), number of client threads: 
> 0 (use Netty's default), server connect backlog: 0 (use Netty's default), 
> client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use 
> Netty's default)]
> 18:45:00,822 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Messages between TaskManager and JobManager have a max timeout of 10 
> milliseconds
> 18:45:00,825 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Temporary file directory 
> '/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007':
>  total 15 GB, usable 7 GB (46.67% usable)
> 18:45:00,929 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 
> MB for network buffer pool (number of memory segments: 2048, bytes per 
> segment: 32768).
> 18:45:01,186 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Using 0.7 of the currently free heap space for Flink managed memory (236 
> MB).
> 18:45:01,755 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager   
>- I/O manager uses directory 
> /home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
>  for spill files.
> 18:45:01,831 ERROR org.apache.flink.yarn.appMaster.YarnTaskManagerRunner  
>- RECEIVED SIGNAL 15: SIGTERM
> 18:45:01,833 ERROR org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync  
>- Error while shutting down IO Manager reader thread.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:133)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
> 18:45:01,841 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager   
>- I/O manager removed spill file directory 
> /home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
> {code}
> Looks like the TM is shutting down while still starting up. Hardening this 
> should be easy.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/78052378/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2596) Failing Test: RandomSamplerTest

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725127#comment-14725127
 ] 

ASF GitHub Bot commented on FLINK-2596:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1080#issuecomment-136663351
  
This has been merged, but I think apache and github is out of sync again. 
Feel free to manually close this.

Thank you for the contribution!


https://git-wip-us.apache.org/repos/asf/flink/repo?p=flink.git;a=commit;h=f4a48c23a30c170a5a2c08c27e1f01f7827eefd2


> Failing Test: RandomSamplerTest
> ---
>
> Key: FLINK-2596
> URL: https://issues.apache.org/jira/browse/FLINK-2596
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Assignee: Chengxiang Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 0.10
>
>
> {noformat}
> Tests run: 17, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 14.925 sec 
> <<< FAILURE! - in org.apache.flink.api.java.sampling.RandomSamplerTest
> testReservoirSamplerWithMultiSourcePartitions2(org.apache.flink.api.java.sampling.RandomSamplerTest)
>  Time elapsed: 0.444 sec <<< ERROR!
> java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!
> at java.util.TimSort.mergeLo(TimSort.java:747)
> at java.util.TimSort.mergeAt(TimSort.java:483)
> at java.util.TimSort.mergeCollapse(TimSort.java:410)
> at java.util.TimSort.sort(TimSort.java:214)
> at java.util.TimSort.sort(TimSort.java:173)
> at java.util.Arrays.sort(Arrays.java:659)
> at java.util.Collections.sort(Collections.java:217)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.transferFromListToArrayWithOrder(RandomSamplerTest.java:375)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.getSampledOutput(RandomSamplerTest.java:367)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyKSTest(RandomSamplerTest.java:338)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyRandomSamplerWithSampleSize(RandomSamplerTest.java:330)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyReservoirSamplerWithReplacement(RandomSamplerTest.java:290)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2(RandomSamplerTest.java:212)
> Results :
> Tests in error:
> RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2:212->verifyReservoirSamplerWithReplacement:290->verifyRandomSamplerWithSampleSize:330->verifyKSTest:338->getSampledOutput:367->transferFromListToArrayWithOrder:375
>  » IllegalArgument
> {noformat}
> https://travis-ci.org/apache/flink/jobs/77750329



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2596] Failing Test: RandomSamplerTest

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1080#issuecomment-136663351
  
This has been merged, but I think apache and github is out of sync again. 
Feel free to manually close this.

Thank you for the contribution!


https://git-wip-us.apache.org/repos/asf/flink/repo?p=flink.git;a=commit;h=f4a48c23a30c170a5a2c08c27e1f01f7827eefd2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2372) Update KafkaSink to use new Producer API

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725129#comment-14725129
 ] 

ASF GitHub Bot commented on FLINK-2372:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1082#discussion_r38404379
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in 
one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * # More Flink partitions than kafka partitions
--- End diff --

These comments should be preformatted ``


> Update KafkaSink to use new Producer API
> 
>
> Key: FLINK-2372
> URL: https://issues.apache.org/jira/browse/FLINK-2372
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.10
>
>
> Flink's Kafka Sink is using Kafka's old Producer API, which is has very poor 
> performance and a limited API.
> I'll implement a new KafkaSink which is using the new API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2372] Add new FlinkKafkaProducer

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1082#discussion_r38404379
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in 
one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * # More Flink partitions than kafka partitions
--- End diff --

These comments should be preformatted ``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2372] Add new FlinkKafkaProducer

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1082#discussion_r38404460
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Assert;
+import org.junit.Test;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+
+public class TestFixedPartitioner {
+
+
+   /**
+*  Flink Sinks:Kafka Partitions
--- End diff --

Preformatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2372) Update KafkaSink to use new Producer API

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725130#comment-14725130
 ] 

ASF GitHub Bot commented on FLINK-2372:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1082#discussion_r38404434
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/RichKafkaPartitioner.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+
+/**
+ * Extended Kafka Partitioner.
+ * It contains a prepare() method which is called on each parallel 
instance.
+ */
+public abstract class RichKafkaPartitioner implements KafkaPartitioner {
+   private static final long serialVersionUID = -4590784174150709918L;
+
+   public abstract void prepare(int parallelInstanceId, int 
parallelInstances, int[] partitions);
--- End diff --

In all other cases, the method is called `open()`. I think with that we 
established a bit of a terminology inside Flink, which would be good to follow 
here.


> Update KafkaSink to use new Producer API
> 
>
> Key: FLINK-2372
> URL: https://issues.apache.org/jira/browse/FLINK-2372
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.10
>
>
> Flink's Kafka Sink is using Kafka's old Producer API, which is has very poor 
> performance and a limited API.
> I'll implement a new KafkaSink which is using the new API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2372] Add new FlinkKafkaProducer

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1082#discussion_r38404434
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/RichKafkaPartitioner.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+
+/**
+ * Extended Kafka Partitioner.
+ * It contains a prepare() method which is called on each parallel 
instance.
+ */
+public abstract class RichKafkaPartitioner implements KafkaPartitioner {
+   private static final long serialVersionUID = -4590784174150709918L;
+
+   public abstract void prepare(int parallelInstanceId, int 
parallelInstances, int[] partitions);
--- End diff --

In all other cases, the method is called `open()`. I think with that we 
established a bit of a terminology inside Flink, which would be good to follow 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2372) Update KafkaSink to use new Producer API

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725131#comment-14725131
 ] 

ASF GitHub Bot commented on FLINK-2372:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1082#discussion_r38404460
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Assert;
+import org.junit.Test;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+
+public class TestFixedPartitioner {
+
+
+   /**
+*  Flink Sinks:Kafka Partitions
--- End diff --

Preformatting


> Update KafkaSink to use new Producer API
> 
>
> Key: FLINK-2372
> URL: https://issues.apache.org/jira/browse/FLINK-2372
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.10
>
>
> Flink's Kafka Sink is using Kafka's old Producer API, which is has very poor 
> performance and a limited API.
> I'll implement a new KafkaSink which is using the new API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2372] Add new FlinkKafkaProducer

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1082#issuecomment-136665284
  
Looks pretty good in general. Minor comments inline.

We have now the `Partitioner`, `KafkaPartitioner`, `RichKafkaPartitioner` 
classes. Can we somehow collapse the later two into one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2372) Update KafkaSink to use new Producer API

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725134#comment-14725134
 ] 

ASF GitHub Bot commented on FLINK-2372:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1082#issuecomment-136665284
  
Looks pretty good in general. Minor comments inline.

We have now the `Partitioner`, `KafkaPartitioner`, `RichKafkaPartitioner` 
classes. Can we somehow collapse the later two into one?


> Update KafkaSink to use new Producer API
> 
>
> Key: FLINK-2372
> URL: https://issues.apache.org/jira/browse/FLINK-2372
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.10
>
>
> Flink's Kafka Sink is using Kafka's old Producer API, which is has very poor 
> performance and a limited API.
> I'll implement a new KafkaSink which is using the new API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2372] Add new FlinkKafkaProducer

2015-09-01 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1082#issuecomment-13154
  
Thank you for the feedback. I will address the concerns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2372) Update KafkaSink to use new Producer API

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725139#comment-14725139
 ] 

ASF GitHub Bot commented on FLINK-2372:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1082#issuecomment-13154
  
Thank you for the feedback. I will address the concerns.


> Update KafkaSink to use new Producer API
> 
>
> Key: FLINK-2372
> URL: https://issues.apache.org/jira/browse/FLINK-2372
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.10
>
>
> Flink's Kafka Sink is using Kafka's old Producer API, which is has very poor 
> performance and a limited API.
> I'll implement a new KafkaSink which is using the new API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-09-01 Thread shghatge
Github user shghatge closed the pull request at:

https://github.com/apache/flink/pull/847


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725140#comment-14725140
 ] 

ASF GitHub Bot commented on FLINK-1520:
---

Github user shghatge closed the pull request at:

https://github.com/apache/flink/pull/847


> Read edges and vertices from CSV files
> --
>
> Key: FLINK-1520
> URL: https://issues.apache.org/jira/browse/FLINK-1520
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Shivani Ghatge
>Priority: Minor
>  Labels: easyfix, newbie
>
> Add methods to create Vertex and Edge Datasets directly from CSV file inputs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...

2015-09-01 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-13651
  
@vasia  It is fine with me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1520) Read edges and vertices from CSV files

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725142#comment-14725142
 ] 

ASF GitHub Bot commented on FLINK-1520:
---

Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/847#issuecomment-13651
  
@vasia  It is fine with me.


> Read edges and vertices from CSV files
> --
>
> Key: FLINK-1520
> URL: https://issues.apache.org/jira/browse/FLINK-1520
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Shivani Ghatge
>Priority: Minor
>  Labels: easyfix, newbie
>
> Add methods to create Vertex and Edge Datasets directly from CSV file inputs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...

2015-09-01 Thread shghatge
Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-136667142
  
@vasia This is giving merge conflicts when rebasing. Since the conflicts 
are in commits other than mine, I am thinking of creating a new PR. I have done 
the similar changes in #923. If the last commit in this is okay, I will update 
that PR as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2310) Add an Adamic-Adar Similarity example

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725147#comment-14725147
 ] 

ASF GitHub Bot commented on FLINK-2310:
---

Github user shghatge commented on the pull request:

https://github.com/apache/flink/pull/892#issuecomment-136667142
  
@vasia This is giving merge conflicts when rebasing. Since the conflicts 
are in commits other than mine, I am thinking of creating a new PR. I have done 
the similar changes in #923. If the last commit in this is okay, I will update 
that PR as well?


> Add an Adamic-Adar Similarity example
> -
>
> Key: FLINK-2310
> URL: https://issues.apache.org/jira/browse/FLINK-2310
> Project: Flink
>  Issue Type: Task
>  Components: Gelly
>Reporter: Andra Lungu
>Assignee: Shivani Ghatge
>Priority: Minor
>
> Just as Jaccard, the Adamic-Adar algorithm measures the similarity between a 
> set of nodes. However, instead of counting the common neighbors and dividing 
> them by the total number of neighbors, the similarity is weighted according 
> to the vertex degrees. In particular, it's equal to log(1/numberOfEdges).
> The Adamic-Adar algorithm can be broken into three steps: 
> 1). For each vertex, compute the log of its inverse degrees (with the formula 
> above) and set it as the vertex value. 
> 2). Each vertex will then send this new computed value along with a list of 
> neighbors to the targets of its out-edges
> 3). Weigh the edges with the Adamic-Adar index: Sum over n from CN of 
> log(1/k_n)(CN is the set of all common neighbors of two vertices x, y. k_n is 
> the degree of node n). See [2]
> Prerequisites: 
> - Full understanding of the Jaccard Similarity Measure algorithm
> - Reading the associated literature: 
> [1] http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
> [2] 
> http://stackoverflow.com/questions/22565620/fast-algorithm-to-compute-adamic-adar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136667496
  
@HuangWHWHW 

I think this test and also the class `PrintSinkFunctionTest` needs some 
more improvement. Here are a few comments:

  - Exception during `open()` should not be caught, but should fail the 
test. This happens at multiple points in the code.
  - The test contains a class that starts with a lower case letter, which 
is against the Java style rules.
  - The test as a whole extends `RichSinkFunction` which is not necessary 
and seems strange.
  - The mock print stream overwrites only one `println()` function, which 
makes it susceptible to changes in the sink. It is better to use a regular 
`PrintStream` that prints to a `StringWriter` to collect all contents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725151#comment-14725151
 ] 

ASF GitHub Bot commented on FLINK-2480:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136667496
  
@HuangWHWHW 

I think this test and also the class `PrintSinkFunctionTest` needs some 
more improvement. Here are a few comments:

  - Exception during `open()` should not be caught, but should fail the 
test. This happens at multiple points in the code.
  - The test contains a class that starts with a lower case letter, which 
is against the Java style rules.
  - The test as a whole extends `RichSinkFunction` which is not necessary 
and seems strange.
  - The mock print stream overwrites only one `println()` function, which 
makes it susceptible to changes in the sink. It is better to use a regular 
`PrintStream` that prints to a `StringWriter` to collect all contents.


> Improving tests coverage for org.apache.flink.streaming.api
> ---
>
> Key: FLINK-2480
> URL: https://issues.apache.org/jira/browse/FLINK-2480
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The streaming API is quite a bit newer than the other code so it is not that 
> well covered with tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2545] add bucket member count verificat...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1067#issuecomment-136667661
  
Looks good, I'll merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725153#comment-14725153
 ] 

ASF GitHub Bot commented on FLINK-2545:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1067#issuecomment-136667661
  
Looks good, I'll merge this!


> NegativeArraySizeException while creating hash table bloom filters
> --
>
> Key: FLINK-2545
> URL: https://issues.apache.org/jira/browse/FLINK-2545
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master
>Reporter: Greg Hogan
>Assignee: Chengxiang Li
>
> The following exception occurred a second time when I immediately re-ran my 
> application, though after recompiling and restarting Flink the subsequent 
> execution ran without error.
> java.lang.Exception: The data preparation for task '...' , caused an error: 
> null
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NegativeArraySizeException
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455)
>   at 
> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93)
>   at 
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459)
>   ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2575) Don't activate hash table bloom filter optimisation by default

2015-09-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2575.
---

> Don't activate hash table bloom filter optimisation by default
> --
>
> Key: FLINK-2575
> URL: https://issues.apache.org/jira/browse/FLINK-2575
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: master
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> I would like to set the default value of 
> {{taskmanager.runtime.hashjoin-bloom-filters}} to {{false}} until FLINK-2545 
> is fixed. This issue is to keep track of this for the upcoming release. After 
> FLINK-2545 is fixed, this issue becomes obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2575) Don't activate hash table bloom filter optimisation by default

2015-09-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2575.
-
Resolution: Done

Done in ced6a1993815fc61fc0c4b68370d9af76de68c71

> Don't activate hash table bloom filter optimisation by default
> --
>
> Key: FLINK-2575
> URL: https://issues.apache.org/jira/browse/FLINK-2575
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: master
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> I would like to set the default value of 
> {{taskmanager.runtime.hashjoin-bloom-filters}} to {{false}} until FLINK-2545 
> is fixed. This issue is to keep track of this for the upcoming release. After 
> FLINK-2545 is fixed, this issue becomes obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2602) Gelly algorithms obtain new execution environments.

2015-09-01 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2602:
---

 Summary: Gelly algorithms obtain new execution environments.
 Key: FLINK-2602
 URL: https://issues.apache.org/jira/browse/FLINK-2602
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Affects Versions: 0.10
Reporter: Stephan Ewen


I have seen that Gelly occasionally uses 
{{ExecutionEnvironment.getExecutionEnvironment()}} to obtain en execution 
environment.

That easily leads to problems as it creates new execution environments in many 
cases. For example if the original execution environment was created via 
{{ExecutionEnvironment.createRemoteEnvironment(...)}}, it will inevitably give 
you a wrong execution environment.

If new sources need to be created, they should be created with the execution 
environment of the other data sets in the computation, for example via 
{{vertexDataSet.getExecutionEnvironment().fromElementx(a, b, c)}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2448]Create new Test environments on ge...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-136669559
  
I think this looks good. Will merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2372] Add new FlinkKafkaProducer

2015-09-01 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1082#issuecomment-136669637
  
I updated the PR and rebased to master (which is adding some commits from 
the future (at least from GitHubs perspective) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725162#comment-14725162
 ] 

ASF GitHub Bot commented on FLINK-2448:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-136669559
  
I think this looks good. Will merge this...


> registerCacheFile fails with MultipleProgramsTestbase
> -
>
> Key: FLINK-2448
> URL: https://issues.apache.org/jira/browse/FLINK-2448
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Chesnay Schepler
>Assignee: Sachin Goel
>Priority: Minor
>
> When trying to register a file using a constant name an expection is thrown 
> saying the file was already cached.
> This is probably because the same environment is reused, and the cacheFile 
> entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2372) Update KafkaSink to use new Producer API

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725164#comment-14725164
 ] 

ASF GitHub Bot commented on FLINK-2372:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1082#issuecomment-136669637
  
I updated the PR and rebased to master (which is adding some commits from 
the future (at least from GitHubs perspective) )


> Update KafkaSink to use new Producer API
> 
>
> Key: FLINK-2372
> URL: https://issues.apache.org/jira/browse/FLINK-2372
> Project: Flink
>  Issue Type: Task
>  Components: Kafka Connector
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.10
>
>
> Flink's Kafka Sink is using Kafka's old Producer API, which is has very poor 
> performance and a limited API.
> I'll implement a new KafkaSink which is using the new API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable

2015-09-01 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725169#comment-14725169
 ] 

Stephan Ewen commented on FLINK-2066:
-

I anything happening here?

Someone else is interested to take this over...

> Make delay between execution retries configurable
> -
>
> Key: FLINK-2066
> URL: https://issues.apache.org/jira/browse/FLINK-2066
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Nuno Miguel Marques dos Santos
>  Labels: starter
>
> Flink allows to specify a delay between execution retries. This helps to let 
> some external failure causes fully manifest themselves before the restart is 
> attempted.
> The delay is currently defined only system wide.
> We should add it to the {{ExecutionConfig}} of a job to allow per-job 
> specification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2580) HadoopDataOutputStream does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream

2015-09-01 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725176#comment-14725176
 ] 

Stephan Ewen commented on FLINK-2580:
-

I'll add a temporary solution that allows you to get the original FileSystem 
and the Original FileSteams

> HadoopDataOutputStream does not expose enough methods of 
> org.apache.hadoop.fs.FSDataOutputStream
> 
>
> Key: FLINK-2580
> URL: https://issues.apache.org/jira/browse/FLINK-2580
> Project: Flink
>  Issue Type: Improvement
>  Components: Hadoop Compatibility
>Reporter: Arnaud Linz
>Priority: Minor
>
> I’ve noticed that when you use org.apache.flink.core.fs.FileSystem to write 
> into a hdfs file, calling 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(), it returns a  
> HadoopDataOutputStream that wraps a org.apache.hadoop.fs.FSDataOutputStream 
> (under its org.apache.hadoop.hdfs.client .HdfsDataOutputStream wrappper).
>  
> However, FSDataOutputStream exposes many methods like flush,   getPos etc, 
> but HadoopDataOutputStream only wraps write & close.
>  
> For instance, flush() calls the default, empty implementation of OutputStream 
> instead of the hadoop one, and that’s confusing. Moreover, because of the 
> restrictive OutputStream interface, hsync() and hflush() are not exposed to 
> Flink.
> I see two options:
> - complete the class to wrap all methods of OutputStream and add a 
> getWrappedStream() to access other stuff like hsync().
> - get rid of the Hadoop wrapping and directly use Hadoop file system objects.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136677416
  
@StephanEwen 
Yes, thank you for the comments very much.
I`ll take the fix.
BTW:There are two PRs of mine that need someone to give some comments:
https://github.com/apache/flink/pull/1030
https://github.com/apache/flink/pull/992
Could you take a look?
Very sorry if it will spend your time.
Thanks a lot!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2580) HadoopDataOutputStream does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream

2015-09-01 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725207#comment-14725207
 ] 

Stephan Ewen commented on FLINK-2580:
-

I am testing a patch that adds more of the {{hsync}} and {{hflush}} methods, 
and exposes the wrapped classes.

The sensible long-term solution would be to get rid of the wrapper classes 
alltogether.
Anyone interested in picking that up?

> HadoopDataOutputStream does not expose enough methods of 
> org.apache.hadoop.fs.FSDataOutputStream
> 
>
> Key: FLINK-2580
> URL: https://issues.apache.org/jira/browse/FLINK-2580
> Project: Flink
>  Issue Type: Improvement
>  Components: Hadoop Compatibility
>Reporter: Arnaud Linz
>Priority: Minor
>
> I’ve noticed that when you use org.apache.flink.core.fs.FileSystem to write 
> into a hdfs file, calling 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(), it returns a  
> HadoopDataOutputStream that wraps a org.apache.hadoop.fs.FSDataOutputStream 
> (under its org.apache.hadoop.hdfs.client .HdfsDataOutputStream wrappper).
>  
> However, FSDataOutputStream exposes many methods like flush,   getPos etc, 
> but HadoopDataOutputStream only wraps write & close.
>  
> For instance, flush() calls the default, empty implementation of OutputStream 
> instead of the hadoop one, and that’s confusing. Moreover, because of the 
> restrictive OutputStream interface, hsync() and hflush() are not exposed to 
> Flink.
> I see two options:
> - complete the class to wrap all methods of OutputStream and add a 
> getWrappedStream() to access other stuff like hsync().
> - get rid of the Hadoop wrapping and directly use Hadoop file system objects.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725208#comment-14725208
 ] 

ASF GitHub Bot commented on FLINK-2480:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136677416
  
@StephanEwen 
Yes, thank you for the comments very much.
I`ll take the fix.
BTW:There are two PRs of mine that need someone to give some comments:
https://github.com/apache/flink/pull/1030
https://github.com/apache/flink/pull/992
Could you take a look?
Very sorry if it will spend your time.
Thanks a lot!


> Improving tests coverage for org.apache.flink.streaming.api
> ---
>
> Key: FLINK-2480
> URL: https://issues.apache.org/jira/browse/FLINK-2480
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The streaming API is quite a bit newer than the other code so it is not that 
> well covered with tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408542
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
--- End diff --

You can log the exception simpler like this:
```
LOG.error("Cannot send message " + value + " to socket server at " + 
hostName + ":" + port + ". Trying to reconnect.", e);
`´`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408571
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

The `lock` part can probably be replaced by a simple `Thread.sleep(...)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725221#comment-14725221
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408571
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

The `lock` part can probably be replaced by a simple `Thread.sleep(...)`.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725220#comment-14725220
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408542
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
--- End diff --

You can log the exception simpler like this:
```
LOG.error("Cannot send message " + value + " to socket server at " + 
hostName + ":" + port + ". Trying to reconnect.", e);
`´`


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408757
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
+   lock = new Object();
+   }
+
+   try {
+   synchronized (lock) {
+   
lock.wait(CONNECTION_RETRY_SLEEP);
+   }
+   } catch(InterruptedException eee) {
+   LOG.error(eee.toString());
--- End diff --

I think there is no need to log this. Interrupting the thread usually means 
that it has been shut down.

If you want to log exceptions, log them via ("message", exeption), here 
`LOG.error("Reconnect delay interrupted", e);`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725223#comment-14725223
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38408757
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
+   lock = new Object();
+   }
+
+   try {
+   synchronized (lock) {
+   
lock.wait(CONNECTION_RETRY_SLEEP);
+   }
+   } catch(InterruptedException eee) {
+   LOG.error(eee.toString());
--- End diff --

I think there is no need to log this. Interrupting the thread usually means 
that it has been shut down.

If you want to log exceptions, log them via ("message", exeption), here 
`LOG.error("Reconnect delay interrupted", e);`


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38409485
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   static int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retries;
 
private volatile boolean isRunning;
 
public SocketTextStreamFunction(String hostname, int port, char 
delimiter, long maxRetry) {
+   this.retries = 0;
--- End diff --

Initialization to `0` is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725242#comment-14725242
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38409485
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   static int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retries;
 
private volatile boolean isRunning;
 
public SocketTextStreamFunction(String hostname, int port, char 
delimiter, long maxRetry) {
+   this.retries = 0;
--- End diff --

Initialization to `0` is not needed.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-01 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-2590.
--
Resolution: Fixed

Fixed via ab14f90142fd69426bb695cbdb641f0a5a0c46f7

> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136686857
  
·The mock print stream overwrites only one println() function, which makes 
it susceptible to changes in the sink. It is better to use a regular 
PrintStream that prints to a StringWriter to collect all contents.·

Hi @StephanEwen 
If I change this how can I get the printed message?
Since I use System.setOut(stream); to solve this before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725273#comment-14725273
 ] 

ASF GitHub Bot commented on FLINK-2480:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136686857
  
·The mock print stream overwrites only one println() function, which makes 
it susceptible to changes in the sink. It is better to use a regular 
PrintStream that prints to a StringWriter to collect all contents.·

Hi @StephanEwen 
If I change this how can I get the printed message?
Since I use System.setOut(stream); to solve this before.


> Improving tests coverage for org.apache.flink.streaming.api
> ---
>
> Key: FLINK-2480
> URL: https://issues.apache.org/jira/browse/FLINK-2480
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The streaming API is quite a bit newer than the other code so it is not that 
> well covered with tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

2015-09-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410693
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

The idea was to speed up the proper termination of the `SocketClientSink` 
upon calling `closeConnection`. Otherwise one would have to wait a complete 
`CONNECTION_RETRY_SLEEP` cycle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725275#comment-14725275
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410693
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

The idea was to speed up the proper termination of the `SocketClientSink` 
upon calling `closeConnection`. Otherwise one would have to wait a complete 
`CONNECTION_RETRY_SLEEP` cycle.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410780
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

Ah, this is a comment of @tillrohrmann.
See this:

![image](https://cloud.githubusercontent.com/assets/13193847/9603561/04dad2b4-50e4-11e5-96e3-143d4d5084bf.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725277#comment-14725277
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410780
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
--- End diff --

Ah, this is a comment of @tillrohrmann.
See this:

![image](https://cloud.githubusercontent.com/assets/13193847/9603561/04dad2b4-50e4-11e5-96e3-143d4d5084bf.png)



> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410820
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
+   lock = new Object();
+   }
+
+   try {
+   synchronized (lock) {
+   
lock.wait(CONNECTION_RETRY_SLEEP);
+   }
+   } catch(InterruptedException eee) {
+   LOG.error(eee.toString());
--- End diff --

Ok, I`ll remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725280#comment-14725280
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/1030#discussion_r38410820
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
 ---
@@ -73,13 +90,56 @@ public void intializeConnection() {
 *  The incoming data
 */
@Override
-   public void invoke(IN value) {
+   public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
-   throw new RuntimeException("Cannot send message " + 
value.toString() +
-   " to socket server at " + hostName + 
":" + port, e);
+   LOG.error("Cannot send message " + value.toString() +
+   " to socket server at " + hostName + 
":" + port + ". Caused by " + e.toString() +
+   ". Trying to reconnect.");
+   retries = 0;
+   boolean success = false;
+   while ((retries < maxRetry || retryForever) && !success 
&& isRunning){
+   try {
+
+   if (dataOutputStream != null) {
+   dataOutputStream.close();
+   }
+
+   if (client != null && 
!client.isClosed()) {
+   client.close();
+   }
+
+   retries++;
+
+   client = new Socket(hostName, port);
+   dataOutputStream = new 
DataOutputStream(client.getOutputStream());
+   dataOutputStream.write(msg);
+   success = true;
+
+   } catch(IOException ee) {
+   LOG.error("Reconnect to socket server 
and send message failed. Caused by " +
+   ee.toString() + ". 
Retry time(s):" + retries);
+
+   if (lock == null) {
+   lock = new Object();
+   }
+
+   try {
+   synchronized (lock) {
+   
lock.wait(CONNECTION_RETRY_SLEEP);
+   }
+   } catch(InterruptedException eee) {
+   LOG.error(eee.toString());
--- End diff --

Ok, I`ll remove it.


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2161] modified Scala shell start script...

2015-09-01 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/805#issuecomment-136689479
  
Could someone please check this, it has been here for a while and I've got 
some other stuff building on top of this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2161) Flink Scala Shell does not support external jars (e.g. Gelly, FlinkML)

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725291#comment-14725291
 ] 

ASF GitHub Bot commented on FLINK-2161:
---

Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/805#issuecomment-136689479
  
Could someone please check this, it has been here for a while and I've got 
some other stuff building on top of this!


> Flink Scala Shell does not support external jars (e.g. Gelly, FlinkML)
> --
>
> Key: FLINK-2161
> URL: https://issues.apache.org/jira/browse/FLINK-2161
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Nikolaas Steenbergen
>
> Currently, there is no easy way to load and ship external libraries/jars with 
> Flink's Scala shell. Assume that you want to run some Gelly graph algorithms 
> from within the Scala shell, then you have to put the Gelly jar manually in 
> the lib directory and make sure that this jar is also available on your 
> cluster, because it is not shipped with the user code. 
> It would be good to have a simple mechanism how to specify additional jars 
> upon startup of the Scala shell. These jars should then also be shipped to 
> the cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136694516
  
@StephanEwen 
Hi, I take changes for all of your comments but this following:

"The mock print stream overwrites only one println() function, which makes 
it susceptible to changes in the sink. It is better to use a regular 
PrintStream that prints to a StringWriter to collect all contents."

I have no ideas to assert the print is correct.
So can you tell me in detail?
Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725308#comment-14725308
 ] 

ASF GitHub Bot commented on FLINK-2480:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136694516
  
@StephanEwen 
Hi, I take changes for all of your comments but this following:

"The mock print stream overwrites only one println() function, which makes 
it susceptible to changes in the sink. It is better to use a regular 
PrintStream that prints to a StringWriter to collect all contents."

I have no ideas to assert the print is correct.
So can you tell me in detail?
Thank you very much!


> Improving tests coverage for org.apache.flink.streaming.api
> ---
>
> Key: FLINK-2480
> URL: https://issues.apache.org/jira/browse/FLINK-2480
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The streaming API is quite a bit newer than the other code so it is not that 
> well covered with tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-136702507
  
@StephanEwen 
Hi, I take changes for your comments but the synchronized (lock) in 
SocketClientSink.java.
Need I change this to Thread.sleep()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2536) Add a retry for SocketClientSink

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725331#comment-14725331
 ] 

ASF GitHub Bot commented on FLINK-2536:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/1030#issuecomment-136702507
  
@StephanEwen 
Hi, I take changes for your comments but the synchronized (lock) in 
SocketClientSink.java.
Need I change this to Thread.sleep()?


> Add a retry for SocketClientSink
> 
>
> Key: FLINK-2536
> URL: https://issues.apache.org/jira/browse/FLINK-2536
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I found the SocketClientSink doesn`t use a re-connect when disconnect from 
> the socket server or get exception.
> I`d like to add a re-connect like socket source for socket sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread HuangWHWHW
Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38414236
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   static int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retries;
 
private volatile boolean isRunning;
 
public SocketTextStreamFunction(String hostname, int port, char 
delimiter, long maxRetry) {
+   this.retries = 0;
--- End diff --

Hi,
I removed this in my new commit.
Otherwise why doesn`t the CI run?
This happen in all of my new commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725336#comment-14725336
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user HuangWHWHW commented on a diff in the pull request:

https://github.com/apache/flink/pull/992#discussion_r38414236
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
 ---
@@ -42,11 +42,13 @@
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
-   private static final int CONNECTION_RETRY_SLEEP = 1000;
+   static int CONNECTION_RETRY_SLEEP = 1000;
+   protected long retries;
 
private volatile boolean isRunning;
 
public SocketTextStreamFunction(String hostname, int port, char 
delimiter, long maxRetry) {
+   this.retries = 0;
--- End diff --

Hi,
I removed this in my new commit.
Otherwise why doesn`t the CI run?
This happen in all of my new commits.


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1730) Add a FlinkTools.persist style method to the Data Set.

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725427#comment-14725427
 ] 

ASF GitHub Bot commented on FLINK-1730:
---

GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/1083

[FLINK-1730]Persist operator on Data Sets

This PR introduces a `persist` operation on `DataSet` which allows 
persisting the data set in memory, allowing for direct access if this data set 
is operated on again and again.
The idea behind the implementation is this:
1. A `PersistOperator` extending a `SingleInputUdfOperator` for common api 
and Java API.
2. A `Persist` driver strategy which allows the Job Graph to generate a 
`PersistNode`, which just uses a `NoOpDriver` to forward results from input to 
output.
3. `RegularPactTask` determines whether it is a Persist task and 
accordingly uses a `SpillingResettableMutableObjectIterator` to read the input 
and persist them.
4. To make the results truly persistent, the `MemorySegment`s must not be 
freed when the `Task` ends. To this end, I have created a 
`DummyPersistInvokable` which does nothing. It just prevents freeing of memory.
5. All persisted memory segments are cleared out when the `MemoryManager` 
is shutting down. There is a possibility of writing some kind of Cache clearing 
strategy here.

For testing the functionality, I have written a test `PersistITCase` which 
generates 100 random Long values inside a Map function and persisted the 
output. Then, triggering the execution twice must provide the same results.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink flink-1730

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1083


commit a22cc670697cc601facb164f6fd84ef6438c2499
Author: Sachin Goel 
Date:   2015-08-24T16:07:04Z

Implemented a persist operator which caches elements into a Spilling
buffer.




> Add a FlinkTools.persist style method to the Data Set.
> --
>
> Key: FLINK-1730
> URL: https://issues.apache.org/jira/browse/FLINK-1730
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stephan Ewen
>Priority: Minor
>
> I think this is an operation that will be needed more prominently. Defining a 
> point where one long logical program is broken into different executions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1730]Persist operator on Data Sets

2015-09-01 Thread sachingoel0101
GitHub user sachingoel0101 opened a pull request:

https://github.com/apache/flink/pull/1083

[FLINK-1730]Persist operator on Data Sets

This PR introduces a `persist` operation on `DataSet` which allows 
persisting the data set in memory, allowing for direct access if this data set 
is operated on again and again.
The idea behind the implementation is this:
1. A `PersistOperator` extending a `SingleInputUdfOperator` for common api 
and Java API.
2. A `Persist` driver strategy which allows the Job Graph to generate a 
`PersistNode`, which just uses a `NoOpDriver` to forward results from input to 
output.
3. `RegularPactTask` determines whether it is a Persist task and 
accordingly uses a `SpillingResettableMutableObjectIterator` to read the input 
and persist them.
4. To make the results truly persistent, the `MemorySegment`s must not be 
freed when the `Task` ends. To this end, I have created a 
`DummyPersistInvokable` which does nothing. It just prevents freeing of memory.
5. All persisted memory segments are cleared out when the `MemoryManager` 
is shutting down. There is a possibility of writing some kind of Cache clearing 
strategy here.

For testing the functionality, I have written a test `PersistITCase` which 
generates 100 random Long values inside a Map function and persisted the 
output. Then, triggering the execution twice must provide the same results.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sachingoel0101/flink flink-1730

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1083


commit a22cc670697cc601facb164f6fd84ef6438c2499
Author: Sachin Goel 
Date:   2015-08-24T16:07:04Z

Implemented a persist operator which caches elements into a Spilling
buffer.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2602) Gelly algorithms obtain new execution environments.

2015-09-01 Thread Martin Junghanns (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725457#comment-14725457
 ] 

Martin Junghanns commented on FLINK-2602:
-

I could work on that, good way to scan through the Gelly code.

> Gelly algorithms obtain new execution environments.
> ---
>
> Key: FLINK-2602
> URL: https://issues.apache.org/jira/browse/FLINK-2602
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>
> I have seen that Gelly occasionally uses 
> {{ExecutionEnvironment.getExecutionEnvironment()}} to obtain en execution 
> environment.
> That easily leads to problems as it creates new execution environments in 
> many cases. For example if the original execution environment was created via 
> {{ExecutionEnvironment.createRemoteEnvironment(...)}}, it will inevitably 
> give you a wrong execution environment.
> If new sources need to be created, they should be created with the execution 
> environment of the other data sets in the computation, for example via 
> {{vertexDataSet.getExecutionEnvironment().fromElementx(a, b, c)}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136739055
  
I am not sure why the CI is not retesting this.

Can you try to squash your commits into one commit and force-push this 
branch? This always triggers CI for me...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725469#comment-14725469
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136739055
  
I am not sure why the CI is not retesting this.

Can you try to squash your commits into one commit and force-push this 
branch? This always triggers CI for me...


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2490) Remove unwanted boolean check in function SocketTextStreamFunction.streamFromSocket

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725520#comment-14725520
 ] 

ASF GitHub Bot commented on FLINK-2490:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136754340
  
@HuangWHWHW Thank you for addressing my comments. Could you please squash 
your commits and force push to this branch again?


> Remove unwanted boolean check in function 
> SocketTextStreamFunction.streamFromSocket
> ---
>
> Key: FLINK-2490
> URL: https://issues.apache.org/jira/browse/FLINK-2490
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
>Priority: Minor
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-136754340
  
@HuangWHWHW Thank you for addressing my comments. Could you please squash 
your commits and force push to this branch again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2480) Improving tests coverage for org.apache.flink.streaming.api

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725551#comment-14725551
 ] 

ASF GitHub Bot commented on FLINK-2480:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136758616
  
@HuangWHWHW Stephan is talking about something like this instead of the 
PrintStreamMock:

```java
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream captureStream = new PrintStream(baos);
PrintStream original = System.out;
System.setOut(captureStream);

System.out.println("Printing one line");
System.out.println("Another line");

System.setOut(original);
captureStream.close();

Assert.equals("Printing one line\nAnotherline\n", baos.toString());
```

You can see that we're using a `PrintStream` with a `ByteArrayOutputStream` 
here to capture the contents that are being printed to standard out.


> Improving tests coverage for org.apache.flink.streaming.api
> ---
>
> Key: FLINK-2480
> URL: https://issues.apache.org/jira/browse/FLINK-2480
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Huang Wei
> Fix For: 0.10
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The streaming API is quite a bit newer than the other code so it is not that 
> well covered with tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2480][test]add a test for Print Sink wi...

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1073#issuecomment-136758616
  
@HuangWHWHW Stephan is talking about something like this instead of the 
PrintStreamMock:

```java
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream captureStream = new PrintStream(baos);
PrintStream original = System.out;
System.setOut(captureStream);

System.out.println("Printing one line");
System.out.println("Another line");

System.setOut(original);
captureStream.close();

Assert.equals("Printing one line\nAnotherline\n", baos.toString());
```

You can see that we're using a `PrintStream` with a `ByteArrayOutputStream` 
here to capture the contents that are being printed to standard out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2186) Rework CSV import to support very wide files

2015-09-01 Thread Theodore Vasiloudis (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Theodore Vasiloudis updated FLINK-2186:
---
Fix Version/s: (was: 0.10)

> Rework CSV import to support very wide files
> 
>
> Key: FLINK-2186
> URL: https://issues.apache.org/jira/browse/FLINK-2186
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library, Scala API
>Reporter: Theodore Vasiloudis
>
> In the current readVcsFile implementation, importing CSV files with many 
> columns can become from cumbersome to impossible.
> For example to import an 11 column file we need to write:
> {code}
> val cancer = env.readCsvFile[(String, String, String, String, String, String, 
> String, String, String, String, 
> String)]("/path/to/breast-cancer-wisconsin.data")
> {code}
> For many use cases in Machine Learning we might have CSV files with thousands 
> or millions of columns that we want to import as vectors.
> In that case using the current readCsvFile method becomes impossible.
> We therefore need to rework the current function, or create a new one that 
> will allow us to import CSV files with an arbitrary number of columns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2260) Have a complete model evaluation and selection framework for FlinkML

2015-09-01 Thread Theodore Vasiloudis (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Theodore Vasiloudis updated FLINK-2260:
---
Fix Version/s: 0.10

> Have a complete model evaluation and selection framework for FlinkML
> 
>
> Key: FLINK-2260
> URL: https://issues.apache.org/jira/browse/FLINK-2260
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>  Labels: ML
>
> This is an umbrella ticket to keep track on the work on the model evaluation 
> and selection for FlinkML. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2260) Have a complete model evaluation and selection framework for FlinkML

2015-09-01 Thread Theodore Vasiloudis (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Theodore Vasiloudis updated FLINK-2260:
---
Fix Version/s: (was: 0.10)

> Have a complete model evaluation and selection framework for FlinkML
> 
>
> Key: FLINK-2260
> URL: https://issues.apache.org/jira/browse/FLINK-2260
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>  Labels: ML
>
> This is an umbrella ticket to keep track on the work on the model evaluation 
> and selection for FlinkML. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1723) Add cross validation for model evaluation

2015-09-01 Thread Theodore Vasiloudis (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Theodore Vasiloudis updated FLINK-1723:
---
Fix Version/s: 0.10

> Add cross validation for model evaluation
> -
>
> Key: FLINK-1723
> URL: https://issues.apache.org/jira/browse/FLINK-1723
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Theodore Vasiloudis
>  Labels: ML
> Fix For: 0.10
>
>
> Cross validation [1] is a standard tool to estimate the test error for a 
> model. As such it is a crucial tool for every machine learning library.
> The cross validation should work with arbitrary Estimators and error metrics. 
> A first cross validation strategy it should support is the k-fold cross 
> validation.
> Resources:
> [1] [http://en.wikipedia.org/wiki/Cross-validation]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2583] Add Stream Sink For Rolling HDFS ...

2015-09-01 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1084

[FLINK-2583] Add Stream Sink For Rolling HDFS Files

Note: The rolling sink is not yet integrated with 
checkpointing/fault-tolerance.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink hdfs-sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1084.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1084


commit 8c852c2abf3a1e597dd1d139197d3420861a516c
Author: Robert Metzger 
Date:   2015-08-27T16:13:08Z

[FLINK-2584] Check for unshaded classes in fat jar and shade curator

This closes #1076

commit f4a48c23a30c170a5a2c08c27e1f01f7827eefd2
Author: chengxiang li 
Date:   2015-08-31T06:02:27Z

[FLINK-2596] Remove non-transitive comparator in random sampler test.

This closes #1080

commit ab14f90142fd69426bb695cbdb641f0a5a0c46f7
Author: Martin Junghanns 
Date:   2015-08-29T20:51:19Z

[FLINK-2590] fixing DataSetUtils.zipWithUniqueId() and 
DataSetUtils.zipWithIndex()

* modified algorithm as explained in the issue
* updated method documentation

[FLINK-2590] reducing required bit shift size

* maximum bit size is changed to getNumberOfParallelSubTasks() - 1

This closes #1075.

commit 6a58aadec15657a7da60c58ef6d5dbbf7e5ca14b
Author: Till Rohrmann 
Date:   2015-09-01T10:04:23Z

[FLINK-2590] Fixes Scala's DataSetUtilsITCase

commit 81276ff88bb7185d93bbf92392b82b25ece7aff1
Author: Aljoscha Krettek 
Date:   2015-08-31T08:01:38Z

[FLINK-2583] Add Stream Sink For Rolling HDFS Files

The rolling sink is not yet integrated with
checkpointing/fault-tolerance.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725568#comment-14725568
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1084

[FLINK-2583] Add Stream Sink For Rolling HDFS Files

Note: The rolling sink is not yet integrated with 
checkpointing/fault-tolerance.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink hdfs-sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1084.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1084


commit 8c852c2abf3a1e597dd1d139197d3420861a516c
Author: Robert Metzger 
Date:   2015-08-27T16:13:08Z

[FLINK-2584] Check for unshaded classes in fat jar and shade curator

This closes #1076

commit f4a48c23a30c170a5a2c08c27e1f01f7827eefd2
Author: chengxiang li 
Date:   2015-08-31T06:02:27Z

[FLINK-2596] Remove non-transitive comparator in random sampler test.

This closes #1080

commit ab14f90142fd69426bb695cbdb641f0a5a0c46f7
Author: Martin Junghanns 
Date:   2015-08-29T20:51:19Z

[FLINK-2590] fixing DataSetUtils.zipWithUniqueId() and 
DataSetUtils.zipWithIndex()

* modified algorithm as explained in the issue
* updated method documentation

[FLINK-2590] reducing required bit shift size

* maximum bit size is changed to getNumberOfParallelSubTasks() - 1

This closes #1075.

commit 6a58aadec15657a7da60c58ef6d5dbbf7e5ca14b
Author: Till Rohrmann 
Date:   2015-09-01T10:04:23Z

[FLINK-2590] Fixes Scala's DataSetUtilsITCase

commit 81276ff88bb7185d93bbf92392b82b25ece7aff1
Author: Aljoscha Krettek 
Date:   2015-08-31T08:01:38Z

[FLINK-2583] Add Stream Sink For Rolling HDFS Files

The rolling sink is not yet integrated with
checkpointing/fault-tolerance.




> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Framesize fix

2015-09-01 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/934#issuecomment-136765025
  
Hi @kl0u. I think something went wrong during rebasing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >