[jira] [Created] (FLINK-3151) YARN kills Flink TM containers due to memory overuse (outside heap/offheap)

2015-12-09 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-3151:
-

 Summary: YARN kills Flink TM containers due to memory overuse 
(outside heap/offheap)
 Key: FLINK-3151
 URL: https://issues.apache.org/jira/browse/FLINK-3151
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 0.10.1, 1.0.0
 Environment: 48 cores
Reporter: Robert Metzger
Priority: Blocker


A Flink user who's running Flink on YARN with 1 processing slot, 2 GB of TM 
memory on a machine with 48 reported CPU cores is running into issues with TM 
containers being killed due to memory overuse.

Setting the YARN memory cutoff to 0.5 resolves the problem, but its not really 
a feasible approach.
Another solution to the problem was downgrading netty again from 4.0.31.Final 
to 4.0.27.Final resolved the issue.
We upgraded Netty between 0.9 and 0.10.

Most likely netty changed its behavior between the releases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3152) Support all comparisons for Date type

2015-12-09 Thread Timo Walther (JIRA)
Timo Walther created FLINK-3152:
---

 Summary: Support all comparisons for Date type
 Key: FLINK-3152
 URL: https://issues.apache.org/jira/browse/FLINK-3152
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Reporter: Timo Walther


Currently, the Table API does not support comparisons like "DATE < DATE", "DATE 
>= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Community choice for Hadoop Summit Europe 2016

2015-12-09 Thread Kostas Tzoumas
Hi everyone,

Just a reminder, the community vote for the Hadoop Summit Europe 2016 talks
in Dublin is still open until December 15.

There is a very good number of talks around Flink submitted, here are the
ones that mention "flink" in their abstract:
https://hadoopsummit.uservoice.com/search?filter=merged=flink

Vote away :-)

Best,
Kostas


[jira] [Created] (FLINK-3153) Support all comparisons for String type

2015-12-09 Thread Timo Walther (JIRA)
Timo Walther created FLINK-3153:
---

 Summary: Support all comparisons for String type
 Key: FLINK-3153
 URL: https://issues.apache.org/jira/browse/FLINK-3153
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Reporter: Timo Walther


Currently, the Table API does not support comparisons like "STRING < STRING", 
"STRING >= STRING". The ExpressionCodeGenerator needs to be adapted for that. 
It might be necessary to define the order in the TableConfig as this maybe 
depends on the Locale. We should implement it according to SQL implementations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


New Blog Post Draft

2015-12-09 Thread Matthias J. Sax
Hi,

after talking to several people and getting some feedback already, I
would like to suggest a new blog post for the project web site about the
Storm compatibility layer.

You can find the draft here:
https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md

The missing (just not rendered) picture is this one:
https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png

Looking forward to your feedback!


-Matthias



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (FLINK-3150) Make YARN container invocation configurable

2015-12-09 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-3150:
-

 Summary: Make YARN container invocation configurable
 Key: FLINK-3150
 URL: https://issues.apache.org/jira/browse/FLINK-3150
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Reporter: Robert Metzger


Currently, the JVM invocation call of YARN containers is hardcoded.

With this change, I would like to make the call configurable, using a string 
such as
"%java% %memopts% %jvmopts% ..."

Also, we should respect the {{java.env.home}} if its set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3155) Update Flink docker version to latest stable Flink version

2015-12-09 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-3155:
-

 Summary: Update Flink docker version to latest stable Flink version
 Key: FLINK-3155
 URL: https://issues.apache.org/jira/browse/FLINK-3155
 Project: Flink
  Issue Type: Task
  Components: flink-contrib
Affects Versions: 1.0.0
Reporter: Maximilian Michels
 Fix For: 1.0.0


It would be nice to always set the Docker Flink binary URL to point to the 
latest Flink version. Until then, this JIRA keeps track of the updates for 
releases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: New Blog Post Draft

2015-12-09 Thread Slim Baltagi
Matthias,

This is great blog!

I would like to suggest the following: 
Change the title to: How to run your existing Storm applications on Apache 
Flink stream processing engine?
Fixing the few typos
For this reasons -> For these reasons
Storm compatibility package which allows users -> Storm compatibility package 
that allows users 
 we need to translated it  -> we need to translate it 
in not available -> is not available
eg, StormWordCount.jar  -> e.g., StormWordCount.jar
Provide some benchmarks on running a storm application as it is versus running 
it on Flink. 
Thanks

Slim Baltagi

On Dec 9, 2015, at 5:10 AM, Robert Metzger  wrote:

> Great, thank you for writing the article.
> 
> I like the general idea, but I've found some small typos.
> Can you open a pull request against the "flink-web" repo to make reviewing
> it easier?
> 
> On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax  wrote:
> 
>> Hi,
>> 
>> after talking to several people and getting some feedback already, I
>> would like to suggest a new blog post for the project web site about the
>> Storm compatibility layer.
>> 
>> You can find the draft here:
>> 
>> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md
>> 
>> The missing (just not rendered) picture is this one:
>> 
>> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png
>> 
>> Looking forward to your feedback!
>> 
>> 
>> -Matthias
>> 
>> 



[jira] [Created] (FLINK-3154) Update Kryo version from 2.24.0 to 3.0.3

2015-12-09 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-3154:
-

 Summary: Update Kryo version from 2.24.0 to 3.0.3
 Key: FLINK-3154
 URL: https://issues.apache.org/jira/browse/FLINK-3154
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.0.0
Reporter: Maximilian Michels
Priority: Minor
 Fix For: 1.0.0


Flink's Kryo version is outdated and could be updated to a newer version, e.g. 
kryo-3.0.3.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: flink-dist packaging including unshaded classes

2015-12-09 Thread Stephan Ewen
Hi!

Did you change anything in the POM files, with respect to Guava, or add
another dependency that might transitively pull Guava?

Stephan


On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk  wrote:

> Hi there,
>
> I'm attempting to build locally a flink based on release-0.10.0 +
> FLINK-3147. When I build from this sandbox, the resulting flink-dist.jar
> contains both shanded and unshaded jars. In my case, this results in a
> runtime conflict in my application, where com.google.common.base.Stopwatch
> from both Guava-12 and Guava-18 are in my classpath.
>
> Is there some additional profile required to build a dist package with only
> the shaded jars?
>
> Thanks,
> Nick
>
> $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz
> $ cd flink-0.10.0
> $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch
> testing:
> org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class   OK
> testing: org/apache/flink/shaded/com/google/common/base/Stopwatch.class
>   OK
> testing:
>
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
>   OK
> testing:
>
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
>   OK
> testing:
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
>  OK
> testing:
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class   OK
> testing: com/google/inject/internal/util/$Stopwatch.class   OK
>
> vs.
>
> $ git status
> HEAD detached from release-0.10.0
> $ git log --decorate=short --oneline | head -n3
> dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose fields as
> protected
> ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8,
> ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0
> c0fe305 [FLINK-2992] Remove use of SerializationUtils
> $ mvn clean install -DskipTests
> ...
> $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0
> $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch
> testing:
> org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class   OK
> testing: org/apache/flink/shaded/com/google/common/base/Stopwatch.class
>   OK
> testing:
>
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
>   OK
> testing:
>
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
>   OK
> testing:
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
>  OK
> testing:
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class   OK
> testing: com/google/inject/internal/util/$Stopwatch.class   OK
> testing: com/google/common/base/Stopwatch$1.class   OK
> testing: com/google/common/base/Stopwatch.class   OK
> testing:
> com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
>  OK
> testing:
> com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class   OK
>


Re: New Blog Post Draft

2015-12-09 Thread Maximilian Michels
Hi Matthias,

Thank you for the blog post. You had already shared a first draft with
me. This one looks even better!

I've made some minor comments. +1 to merge if these are addressed.

Cheers,
Max

On Wed, Dec 9, 2015 at 1:20 PM, Matthias J. Sax  wrote:
> Just updated the draft (thanks to Till and Slim for feedback) and opened
> a PR.
>
> https://github.com/apache/flink-web/pull/15
>
> @Slim: we discussed about benchmark result beforehand and decided to do
> a second blog post later on
>
>
> -Matthias
>
> On 12/09/2015 12:14 PM, Slim Baltagi wrote:
>> Matthias,
>>
>> This is great blog!
>>
>> I would like to suggest the following:
>> Change the title to: How to run your existing Storm applications on Apache 
>> Flink stream processing engine?
>> Fixing the few typos
>> For this reasons -> For these reasons
>> Storm compatibility package which allows users -> Storm compatibility 
>> package that allows users
>>  we need to translated it  -> we need to translate it
>> in not available -> is not available
>> eg, StormWordCount.jar  -> e.g., StormWordCount.jar
>> Provide some benchmarks on running a storm application as it is versus 
>> running it on Flink.
>> Thanks
>>
>> Slim Baltagi
>>
>> On Dec 9, 2015, at 5:10 AM, Robert Metzger  wrote:
>>
>>> Great, thank you for writing the article.
>>>
>>> I like the general idea, but I've found some small typos.
>>> Can you open a pull request against the "flink-web" repo to make reviewing
>>> it easier?
>>>
>>> On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax  wrote:
>>>
 Hi,

 after talking to several people and getting some feedback already, I
 would like to suggest a new blog post for the project web site about the
 Storm compatibility layer.

 You can find the draft here:

 https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md

 The missing (just not rendered) picture is this one:

 https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png

 Looking forward to your feedback!


 -Matthias


>>
>>
>


Re: New Blog Post Draft

2015-12-09 Thread Robert Metzger
Great, thank you for writing the article.

I like the general idea, but I've found some small typos.
Can you open a pull request against the "flink-web" repo to make reviewing
it easier?

On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax  wrote:

> Hi,
>
> after talking to several people and getting some feedback already, I
> would like to suggest a new blog post for the project web site about the
> Storm compatibility layer.
>
> You can find the draft here:
>
> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md
>
> The missing (just not rendered) picture is this one:
>
> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png
>
> Looking forward to your feedback!
>
>
> -Matthias
>
>


Re: New Blog Post Draft

2015-12-09 Thread Matthias J. Sax
Just updated the draft (thanks to Till and Slim for feedback) and opened
a PR.

https://github.com/apache/flink-web/pull/15

@Slim: we discussed about benchmark result beforehand and decided to do
a second blog post later on


-Matthias

On 12/09/2015 12:14 PM, Slim Baltagi wrote:
> Matthias,
> 
> This is great blog!
> 
> I would like to suggest the following: 
> Change the title to: How to run your existing Storm applications on Apache 
> Flink stream processing engine?
> Fixing the few typos
> For this reasons -> For these reasons
> Storm compatibility package which allows users -> Storm compatibility package 
> that allows users 
>  we need to translated it  -> we need to translate it 
> in not available -> is not available
> eg, StormWordCount.jar  -> e.g., StormWordCount.jar
> Provide some benchmarks on running a storm application as it is versus 
> running it on Flink. 
> Thanks
> 
> Slim Baltagi
> 
> On Dec 9, 2015, at 5:10 AM, Robert Metzger  wrote:
> 
>> Great, thank you for writing the article.
>>
>> I like the general idea, but I've found some small typos.
>> Can you open a pull request against the "flink-web" repo to make reviewing
>> it easier?
>>
>> On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax  wrote:
>>
>>> Hi,
>>>
>>> after talking to several people and getting some feedback already, I
>>> would like to suggest a new blog post for the project web site about the
>>> Storm compatibility layer.
>>>
>>> You can find the draft here:
>>>
>>> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md
>>>
>>> The missing (just not rendered) picture is this one:
>>>
>>> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png
>>>
>>> Looking forward to your feedback!
>>>
>>>
>>> -Matthias
>>>
>>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Task Parallelism in a Cluster

2015-12-09 Thread Kashmar, Ali
Hi Stephan,

That was my original understanding, until I realized that I was not using
a parallel socket source. I had a custom source that extended
SourceFunction which always runs with parallelism = 1. I looked through
the API and found the ParallelSourceFunction interface so I implemented
that and voila, now all 3 nodes in the cluster are actually receiving
traffic on socket connections.

Now that I’m running it successfully end to end, I’m trying to improve the
performance. Can you take a look at the attached screen shot and tell me
if the distribution of work amongst the pipelines is normal? I feel like
some pipelines are lot lazier than others, even though the cluster nodes
are exactly the same.

By the way, here’s the class I wrote. It would be useful to have this
available in Flink distro:

public class ParallelSocketSource implements
ParallelSourceFunction {

private static final long serialVersionUID = -271094428915640892L;
private static final Logger LOG =
LoggerFactory.getLogger(ParallelSocketSource.class);

private volatile boolean running = true;
private String host;
private int port;

public ParallelSocketSource(String host, int port) {
this.host = host;
this.port = port;
}

@Override
public void run(SourceContext ctx) throws Exception {
try (Socket socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream( {
String line  = null;
while(running && ((line = reader.readLine()) != null)) {
ctx.collect(line);
}
} catch(IOException ex) {
LOG.error("error reading from socket", ex);
}
}

@Override
public void cancel() {
running = false;
}
}

Regards,
Ali
 

On 2015-12-08, 3:35 PM, "Stephan Ewen"  wrote:

>Hi Ali!
>
>In the case you have, the sequence of source-map-filter ... forms a
>pipeline.
>
>You mentioned that you set the parallelism to 16, so there should be 16
>pipelines. These pipelines should be completely independent.
>
>Looking at the way the scheduler is implemented, independent pipelines
>should be spread across machines. But when you execute that in parallel,
>you say all 16 pipelines end up on the same machine?
>
>Can you share with us the rough code of your program? Or a Screenshot from
>the runtime dashboard that shows the program graph?
>
>
>If your cluster is basically for that one job only, you could try and set
>the number of slots to 4 for each machine. Then you have 16 slots in total
>and each node would run one of the 16 pipelines.
>
>
>Greetings,
>Stephan
>
>
>On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali  wrote:
>
>> There is no shuffle operation in my flow. Mine actually looks like this:
>>
>> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map
>>->
>> Map, Filter)
>>
>>
>> Maybe it’s treating this whole flow as one pipeline and assigning it to
>>a
>> slot. What I really wanted was to have the custom source I built to have
>> running instances on all nodes. I’m not really sure if that’s the right
>> approach, but if we could add this as a feature that’d be great, since
>> having more than one node running the same pipeline guarantees the
>> pipeline is never offline.
>>
>> -Ali
>>
>> On 2015-12-02, 4:39 AM, "Till Rohrmann"  wrote:
>>
>> >If I'm not mistaken, then the scheduler has already a preference to
>>spread
>> >independent pipelines out across the cluster. At least he uses a queue
>>of
>> >instances from which it pops the first element if it allocates a new
>>slot.
>> >This instance is then appended to the queue again, if it has some
>> >resources
>> >(slots) left.
>> >
>> >I would assume that you have a shuffle operation involved in your job
>>such
>> >that it makes sense for the scheduler to deploy all pipelines to the
>>same
>> >machine.
>> >
>> >Cheers,
>> >Till
>> >On Dec 1, 2015 4:01 PM, "Stephan Ewen"  wrote:
>> >
>> >> Slots are like "resource groups" which execute entire pipelines. They
>> >> frequently have more than one operator.
>> >>
>> >> What you can try as a workaround is decrease the number of slots per
>> >> machine to cause the operators to be spread across more machines.
>> >>
>> >> If this is a crucial issue for your use case, it should be simple to
>> >>add a
>> >> "preference to spread out" to the scheduler...
>> >>
>> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali 
>> >>wrote:
>> >>
>> >> > Is there a way to make a task cluster-parallelizable? I.e. Make
>>sure
>> >>the
>> >> > parallel instances of the task are distributed across the cluster.
>> >>When I
>> >> > run my flink job with a parallelism of 16, all the 

Re: New Blog Post Draft

2015-12-09 Thread Vasiliki Kalavri
Thanks Matthias! This is a very nice blog post and reads easily.

On 9 December 2015 at 19:21, Ufuk Celebi  wrote:

> Great post! Thanks!
>
> I have also made some comments in the commit.
>
> – Ufuk
>
> > On 09 Dec 2015, at 14:19, Maximilian Michels  wrote:
> >
> > Hi Matthias,
> >
> > Thank you for the blog post. You had already shared a first draft with
> > me. This one looks even better!
> >
> > I've made some minor comments. +1 to merge if these are addressed.
> >
> > Cheers,
> > Max
> >
> > On Wed, Dec 9, 2015 at 1:20 PM, Matthias J. Sax 
> wrote:
> >> Just updated the draft (thanks to Till and Slim for feedback) and opened
> >> a PR.
> >>
> >> https://github.com/apache/flink-web/pull/15
> >>
> >> @Slim: we discussed about benchmark result beforehand and decided to do
> >> a second blog post later on
> >>
> >>
> >> -Matthias
> >>
> >> On 12/09/2015 12:14 PM, Slim Baltagi wrote:
> >>> Matthias,
> >>>
> >>> This is great blog!
> >>>
> >>> I would like to suggest the following:
> >>> Change the title to: How to run your existing Storm applications on
> Apache Flink stream processing engine?
> >>> Fixing the few typos
> >>> For this reasons -> For these reasons
> >>> Storm compatibility package which allows users -> Storm compatibility
> package that allows users
> >>> we need to translated it  -> we need to translate it
> >>> in not available -> is not available
> >>> eg, StormWordCount.jar  -> e.g., StormWordCount.jar
> >>> Provide some benchmarks on running a storm application as it is versus
> running it on Flink.
> >>> Thanks
> >>>
> >>> Slim Baltagi
> >>>
> >>> On Dec 9, 2015, at 5:10 AM, Robert Metzger 
> wrote:
> >>>
>  Great, thank you for writing the article.
> 
>  I like the general idea, but I've found some small typos.
>  Can you open a pull request against the "flink-web" repo to make
> reviewing
>  it easier?
> 
>  On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax 
> wrote:
> 
> > Hi,
> >
> > after talking to several people and getting some feedback already, I
> > would like to suggest a new blog post for the project web site about
> the
> > Storm compatibility layer.
> >
> > You can find the draft here:
> >
> >
> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md
> >
> > The missing (just not rendered) picture is this one:
> >
> >
> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png
> >
> > Looking forward to your feedback!
> >
> >
> > -Matthias
> >
> >
> >>>
> >>>
> >>
>
>


Re: New Blog Post Draft

2015-12-09 Thread Ufuk Celebi
Great post! Thanks!

I have also made some comments in the commit.

– Ufuk

> On 09 Dec 2015, at 14:19, Maximilian Michels  wrote:
> 
> Hi Matthias,
> 
> Thank you for the blog post. You had already shared a first draft with
> me. This one looks even better!
> 
> I've made some minor comments. +1 to merge if these are addressed.
> 
> Cheers,
> Max
> 
> On Wed, Dec 9, 2015 at 1:20 PM, Matthias J. Sax  wrote:
>> Just updated the draft (thanks to Till and Slim for feedback) and opened
>> a PR.
>> 
>> https://github.com/apache/flink-web/pull/15
>> 
>> @Slim: we discussed about benchmark result beforehand and decided to do
>> a second blog post later on
>> 
>> 
>> -Matthias
>> 
>> On 12/09/2015 12:14 PM, Slim Baltagi wrote:
>>> Matthias,
>>> 
>>> This is great blog!
>>> 
>>> I would like to suggest the following:
>>> Change the title to: How to run your existing Storm applications on Apache 
>>> Flink stream processing engine?
>>> Fixing the few typos
>>> For this reasons -> For these reasons
>>> Storm compatibility package which allows users -> Storm compatibility 
>>> package that allows users
>>> we need to translated it  -> we need to translate it
>>> in not available -> is not available
>>> eg, StormWordCount.jar  -> e.g., StormWordCount.jar
>>> Provide some benchmarks on running a storm application as it is versus 
>>> running it on Flink.
>>> Thanks
>>> 
>>> Slim Baltagi
>>> 
>>> On Dec 9, 2015, at 5:10 AM, Robert Metzger  wrote:
>>> 
 Great, thank you for writing the article.
 
 I like the general idea, but I've found some small typos.
 Can you open a pull request against the "flink-web" repo to make reviewing
 it easier?
 
 On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax  wrote:
 
> Hi,
> 
> after talking to several people and getting some feedback already, I
> would like to suggest a new blog post for the project web site about the
> Storm compatibility layer.
> 
> You can find the draft here:
> 
> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md
> 
> The missing (just not rendered) picture is this one:
> 
> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png
> 
> Looking forward to your feedback!
> 
> 
> -Matthias
> 
> 
>>> 
>>> 
>> 



Re: flink-dist packaging including unshaded classes

2015-12-09 Thread Nick Dimiduk
mvn dependency:tree from flink-dist module does not include any mention of
guava. When I build (mvn clean package -DskipTests) vs master (fc8be1c) I
see the same packaging problem.

On Wed, Dec 9, 2015 at 9:29 AM, Stephan Ewen  wrote:

> Usually, no command line magic is needed. Simple "mvn clean package" does
> it.
>
> May be that this is not related to your building at all. Can you check
> whether the same is already the case on the master branch with Hadoop 2.7
> Scala 2.11 ?
>
> Also, simple way to diagnose where these dependencies come from is to do
> inside the "flink-dist" project a "mvn dependency:tree" run. That shows how
> the unshaded Guava was pulled in.
>
> Greetings,
> Stephan
>
>
> On Wed, Dec 9, 2015 at 6:22 PM, Nick Dimiduk  wrote:
>
> > I did not. All I did was apply the PR from FLINK-3147. I thought perhaps
> > there's some command line incantation I'm missing.
> >
> > On Wed, Dec 9, 2015 at 3:29 AM, Stephan Ewen  wrote:
> >
> > > Hi!
> > >
> > > Did you change anything in the POM files, with respect to Guava, or add
> > > another dependency that might transitively pull Guava?
> > >
> > > Stephan
> > >
> > >
> > > On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk 
> > wrote:
> > >
> > > > Hi there,
> > > >
> > > > I'm attempting to build locally a flink based on release-0.10.0 +
> > > > FLINK-3147. When I build from this sandbox, the resulting
> > flink-dist.jar
> > > > contains both shanded and unshaded jars. In my case, this results in
> a
> > > > runtime conflict in my application, where
> > > com.google.common.base.Stopwatch
> > > > from both Guava-12 and Guava-18 are in my classpath.
> > > >
> > > > Is there some additional profile required to build a dist package
> with
> > > only
> > > > the shaded jars?
> > > >
> > > > Thanks,
> > > > Nick
> > > >
> > > > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz
> > > > $ cd flink-0.10.0
> > > > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch
> > > > testing:
> > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class   OK
> > > > testing:
> > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class
> > > >   OK
> > > > testing:
> > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> > > >   OK
> > > > testing:
> > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> > > >   OK
> > > > testing:
> > > >
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
> > > >  OK
> > > > testing:
> > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class
> > >  OK
> > > > testing: com/google/inject/internal/util/$Stopwatch.class   OK
> > > >
> > > > vs.
> > > >
> > > > $ git status
> > > > HEAD detached from release-0.10.0
> > > > $ git log --decorate=short --oneline | head -n3
> > > > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose
> fields
> > > as
> > > > protected
> > > > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8,
> > > > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0
> > > > c0fe305 [FLINK-2992] Remove use of SerializationUtils
> > > > $ mvn clean install -DskipTests
> > > > ...
> > > > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0
> > > > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch
> > > > testing:
> > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class   OK
> > > > testing:
> > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class
> > > >   OK
> > > > testing:
> > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> > > >   OK
> > > > testing:
> > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> > > >   OK
> > > > testing:
> > > >
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
> > > >  OK
> > > > testing:
> > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class
> > >  OK
> > > > testing: com/google/inject/internal/util/$Stopwatch.class   OK
> > > > testing: com/google/common/base/Stopwatch$1.class   OK
> > > > testing: com/google/common/base/Stopwatch.class   OK
> > > > testing:
> > > >
> com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> > > >  OK
> > > > testing:
> > > > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> > >  OK
> > > >
> > >
> >
>


[jira] [Created] (FLINK-3157) Web frontend json files contain author attribution

2015-12-09 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-3157:
--

 Summary: Web frontend json files contain author attribution 
 Key: FLINK-3157
 URL: https://issues.apache.org/jira/browse/FLINK-3157
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 0.10.1
Reporter: Ufuk Celebi
Priority: Trivial


http://mail-archives.apache.org/mod_mbox/jakarta-jmeter-dev/200402.mbox/%3c4039f65e.7020...@atg.com%3E

{quote}
author tags are officially discouraged. these create difficulties in
establishing the proper ownership and the protection of our
committers. there are other social issues dealing with collaborative
development, but the Board is concerned about the legal ramifications
around the use of author tags
{quote}

Files: {{flink-runtime-web/web-dashboard/*.json}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: flink-dist packaging including unshaded classes

2015-12-09 Thread Robert Metzger
I can confirm that guava is part of the fat jar for the 2.7.0, scala 2.11
distribution.

I'll look into the issue tomorrow

On Wed, Dec 9, 2015 at 7:58 PM, Nick Dimiduk  wrote:

> mvn dependency:tree from flink-dist module does not include any mention of
> guava. When I build (mvn clean package -DskipTests) vs master (fc8be1c) I
> see the same packaging problem.
>
> On Wed, Dec 9, 2015 at 9:29 AM, Stephan Ewen  wrote:
>
> > Usually, no command line magic is needed. Simple "mvn clean package" does
> > it.
> >
> > May be that this is not related to your building at all. Can you check
> > whether the same is already the case on the master branch with Hadoop 2.7
> > Scala 2.11 ?
> >
> > Also, simple way to diagnose where these dependencies come from is to do
> > inside the "flink-dist" project a "mvn dependency:tree" run. That shows
> how
> > the unshaded Guava was pulled in.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Wed, Dec 9, 2015 at 6:22 PM, Nick Dimiduk  wrote:
> >
> > > I did not. All I did was apply the PR from FLINK-3147. I thought
> perhaps
> > > there's some command line incantation I'm missing.
> > >
> > > On Wed, Dec 9, 2015 at 3:29 AM, Stephan Ewen  wrote:
> > >
> > > > Hi!
> > > >
> > > > Did you change anything in the POM files, with respect to Guava, or
> add
> > > > another dependency that might transitively pull Guava?
> > > >
> > > > Stephan
> > > >
> > > >
> > > > On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk 
> > > wrote:
> > > >
> > > > > Hi there,
> > > > >
> > > > > I'm attempting to build locally a flink based on release-0.10.0 +
> > > > > FLINK-3147. When I build from this sandbox, the resulting
> > > flink-dist.jar
> > > > > contains both shanded and unshaded jars. In my case, this results
> in
> > a
> > > > > runtime conflict in my application, where
> > > > com.google.common.base.Stopwatch
> > > > > from both Guava-12 and Guava-18 are in my classpath.
> > > > >
> > > > > Is there some additional profile required to build a dist package
> > with
> > > > only
> > > > > the shaded jars?
> > > > >
> > > > > Thanks,
> > > > > Nick
> > > > >
> > > > > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz
> > > > > $ cd flink-0.10.0
> > > > > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch
> > > > > testing:
> > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class
>  OK
> > > > > testing:
> > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class
> > > > >   OK
> > > > > testing:
> > > > >
> > > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> > > > >   OK
> > > > > testing:
> > > > >
> > > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> > > > >   OK
> > > > > testing:
> > > > >
> > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
> > > > >  OK
> > > > > testing:
> > > > >
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class
> > > >  OK
> > > > > testing: com/google/inject/internal/util/$Stopwatch.class   OK
> > > > >
> > > > > vs.
> > > > >
> > > > > $ git status
> > > > > HEAD detached from release-0.10.0
> > > > > $ git log --decorate=short --oneline | head -n3
> > > > > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose
> > fields
> > > > as
> > > > > protected
> > > > > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8,
> > > > > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0
> > > > > c0fe305 [FLINK-2992] Remove use of SerializationUtils
> > > > > $ mvn clean install -DskipTests
> > > > > ...
> > > > > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0
> > > > > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch
> > > > > testing:
> > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class
>  OK
> > > > > testing:
> > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class
> > > > >   OK
> > > > > testing:
> > > > >
> > > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> > > > >   OK
> > > > > testing:
> > > > >
> > > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> > > > >   OK
> > > > > testing:
> > > > >
> > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
> > > > >  OK
> > > > > testing:
> > > > >
> org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class
> > > >  OK
> > > > > testing: com/google/inject/internal/util/$Stopwatch.class   OK
> > > > > testing: com/google/common/base/Stopwatch$1.class   OK
> > > > > testing: com/google/common/base/Stopwatch.class   OK
> > > > > testing:
> > > > >
> > 

Re: flink-dist packaging including unshaded classes

2015-12-09 Thread Nick Dimiduk
Thanks, I appreciate it.

On Wed, Dec 9, 2015 at 12:50 PM, Robert Metzger  wrote:

> I can confirm that guava is part of the fat jar for the 2.7.0, scala 2.11
> distribution.
>
> I'll look into the issue tomorrow
>
> On Wed, Dec 9, 2015 at 7:58 PM, Nick Dimiduk  wrote:
>
> > mvn dependency:tree from flink-dist module does not include any mention
> of
> > guava. When I build (mvn clean package -DskipTests) vs master (fc8be1c) I
> > see the same packaging problem.
> >
> > On Wed, Dec 9, 2015 at 9:29 AM, Stephan Ewen  wrote:
> >
> > > Usually, no command line magic is needed. Simple "mvn clean package"
> does
> > > it.
> > >
> > > May be that this is not related to your building at all. Can you check
> > > whether the same is already the case on the master branch with Hadoop
> 2.7
> > > Scala 2.11 ?
> > >
> > > Also, simple way to diagnose where these dependencies come from is to
> do
> > > inside the "flink-dist" project a "mvn dependency:tree" run. That shows
> > how
> > > the unshaded Guava was pulled in.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Dec 9, 2015 at 6:22 PM, Nick Dimiduk 
> wrote:
> > >
> > > > I did not. All I did was apply the PR from FLINK-3147. I thought
> > perhaps
> > > > there's some command line incantation I'm missing.
> > > >
> > > > On Wed, Dec 9, 2015 at 3:29 AM, Stephan Ewen 
> wrote:
> > > >
> > > > > Hi!
> > > > >
> > > > > Did you change anything in the POM files, with respect to Guava, or
> > add
> > > > > another dependency that might transitively pull Guava?
> > > > >
> > > > > Stephan
> > > > >
> > > > >
> > > > > On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk 
> > > > wrote:
> > > > >
> > > > > > Hi there,
> > > > > >
> > > > > > I'm attempting to build locally a flink based on release-0.10.0 +
> > > > > > FLINK-3147. When I build from this sandbox, the resulting
> > > > flink-dist.jar
> > > > > > contains both shanded and unshaded jars. In my case, this results
> > in
> > > a
> > > > > > runtime conflict in my application, where
> > > > > com.google.common.base.Stopwatch
> > > > > > from both Guava-12 and Guava-18 are in my classpath.
> > > > > >
> > > > > > Is there some additional profile required to build a dist package
> > > with
> > > > > only
> > > > > > the shaded jars?
> > > > > >
> > > > > > Thanks,
> > > > > > Nick
> > > > > >
> > > > > > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz
> > > > > > $ cd flink-0.10.0
> > > > > > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch
> > > > > > testing:
> > > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class
> >  OK
> > > > > > testing:
> > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class
> > > > > >   OK
> > > > > > testing:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> > > > > >   OK
> > > > > > testing:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> > > > > >   OK
> > > > > > testing:
> > > > > >
> > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
> > > > > >  OK
> > > > > > testing:
> > > > > >
> > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class
> > > > >  OK
> > > > > > testing: com/google/inject/internal/util/$Stopwatch.class
>  OK
> > > > > >
> > > > > > vs.
> > > > > >
> > > > > > $ git status
> > > > > > HEAD detached from release-0.10.0
> > > > > > $ git log --decorate=short --oneline | head -n3
> > > > > > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose
> > > fields
> > > > > as
> > > > > > protected
> > > > > > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8,
> > > > > > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0
> > > > > > c0fe305 [FLINK-2992] Remove use of SerializationUtils
> > > > > > $ mvn clean install -DskipTests
> > > > > > ...
> > > > > > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0
> > > > > > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch
> > > > > > testing:
> > > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class
> >  OK
> > > > > > testing:
> > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class
> > > > > >   OK
> > > > > > testing:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> > > > > >   OK
> > > > > > testing:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> > > > > >   OK
> > > > > > testing:
> > > > > >
> > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
> > > > > >  OK
> > > > > >  

Re: Apache Tinkerpop & Geode Integration?

2015-12-09 Thread Vasiliki Kalavri
Hello squirrels,

I have been discussing with the Apache Tinkerpop [1] community regarding an
integration with Flink/Gelly.
You can read our discussion in [2].

Tinkerpop has a graph traversal machine called Gremlin, which supports many
high-level graph processing languages and runs on top of different systems
(e.g. Giraph, Spark, Graph DBs). You can read more in this great blog post
[3].

The idea is to provide a FlinkGraphComputer implementation, which will add
Gremlin support to Flink.

I believe Tinkerpop is a great project and I would love to see an
integration with Gelly.
Before we move forward, I would like your input!
To me, it seems that this addition would nicely fit in flink-contrib, where
we also have connectors to other projects.
If you agree, I will go ahead and open a JIRA about it.

Thank you!
-Vasia.

[1]: https://tinkerpop.incubator.apache.org/
[2]:
https://mail-archives.apache.org/mod_mbox/incubator-tinkerpop-dev/201511.mbox/%3ccanva_a390l7g169r8sn+ej1-yfkbudlnd4td6atwnp0uza-...@mail.gmail.com%3E
[3]:
http://www.datastax.com/dev/blog/the-benefits-of-the-gremlin-graph-traversal-machine

On 25 November 2015 at 16:54, Vasiliki Kalavri 
wrote:

> Hi James,
>
> I've just subscribed to the Tinkerpop dev mailing list. Could you please
> send a reply to the thread, so then I can reply to it?
> I'm not sure how I can reply to the thread otherwise...
> I also saw that there is a grafos.ml project thread. I could also provide
> some input there :)
>
> Thanks!
> -Vasia.
>
> On 25 November 2015 at 15:09, James Thornton 
> wrote:
>
>> Hi Vasia -
>>
>> Yes, a FlinkGraphComputer should be a straight-forward first step. Also,
>> on
>> the Apache Tinkerpop dev mailing list, Marko thought it might be cool if
>> there was a "Graph API" similar to the "Table API" -- hooking in Gremlin
>> to
>> Flink's fluent API would give Flink users a full graph query language.
>>
>> Stephen Mallette is a TinkerPop core contributor, and he has already
>> started working on a FlinkGraphComputer. There is a Flink/Tinkerpop thread
>> on the TinkerPop dev list -- it would be great to have you part of the
>> conversation there too as we work on the integration:
>>
>>http://mail-archives.apache.org/mod_mbox/incubator-tinkerpop-dev/
>>
>> Thanks, Vasia.
>>
>> - James
>>
>>
>> On Mon, Nov 23, 2015 at 10:28 AM, Vasiliki Kalavri <
>> vasilikikala...@gmail.com> wrote:
>>
>> > Hi James,
>> >
>> > thank you for your e-mail and your interest in Flink :)
>> >
>> > I've recently taken a _quick_ look into Apache TinkerPop and I think
>> it'd
>> > be very interesting to integrate with Flink/Gelly.
>> > Are you thinking about something like a Flink GraphComputer, similar to
>> > Giraph and Spark GraphComputer's?
>> > I believe such an integration should be straight-forward to implement.
>> You
>> > can start by looking into Flink iteration operators [1] and Gelly
>> iteration
>> > abstractions [2].
>> >
>> > Regarding Apache Geode, I'm not familiar with project, but I'll try to
>> take
>> > a look in the following days!
>> >
>> > Cheers,
>> > -Vasia.
>> >
>> >
>> > [1]:
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#iteration-operators
>> > [2]:
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/libs/gelly_guide.html#iterative-graph-processing
>> >
>> >
>> > On 20 November 2015 at 08:32, James Thornton 
>> > wrote:
>> >
>> > > Hi -
>> > >
>> > > This is James Thornton (espeed) from the Apache Tinkerpop project (
>> > > http://tinkerpop.incubator.apache.org/).
>> > >
>> > > The Flink iterators should pair well with Gremlin's Graph Traversal
>> > Machine
>> > > (
>> > >
>> > >
>> >
>> http://www.datastax.com/dev/blog/the-benefits-of-the-gremlin-graph-traversal-machine
>> > > )
>> > > -- it would be good to coordinate on creating an integration.
>> > >
>> > > Also, Apache Geode made a splash today on HN (
>> > > https://news.ycombinator.com/item?id=10596859) -- connecting Geode
>> and
>> > > Flink would be killer. Here's the Geode/Spark connector for
>> refefference:
>> > >
>> > >
>> > >
>> > >
>> >
>> https://github.com/apache/incubator-geode/tree/develop/gemfire-spark-connector
>> > >
>> > > - James
>> > >
>> >
>>
>
>


Re: Task Parallelism in a Cluster

2015-12-09 Thread Kashmar, Ali
Hi Stephan,

Here’s a link to the screenshot I tried to attach earlier:

https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28

It looks to me like the distribution is fairly skewed across the nodes,
even though they’re executing the same pipeline.

Thanks,
Ali


On 2015-12-09, 12:36 PM, "Stephan Ewen"  wrote:

>Hi!
>
>The parallel socket source looks good.
>I think you forgot to attach the screenshot, or the mailing list dropped
>the attachment...
>
>Not sure if I can diagnose that without more details. The sources all do
>the same. Assuming that the server distributes the data evenly across all
>connected sockets, and that the network bandwidth ends up being divided in
>a fair way, all pipelines should run be similarly "eager".
>
>Greetings,
>Stephan
>
>
>On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali  wrote:
>
>> Hi Stephan,
>>
>> That was my original understanding, until I realized that I was not
>>using
>> a parallel socket source. I had a custom source that extended
>> SourceFunction which always runs with parallelism = 1. I looked through
>> the API and found the ParallelSourceFunction interface so I implemented
>> that and voila, now all 3 nodes in the cluster are actually receiving
>> traffic on socket connections.
>>
>> Now that I’m running it successfully end to end, I’m trying to improve
>>the
>> performance. Can you take a look at the attached screen shot and tell me
>> if the distribution of work amongst the pipelines is normal? I feel like
>> some pipelines are lot lazier than others, even though the cluster nodes
>> are exactly the same.
>>
>> By the way, here’s the class I wrote. It would be useful to have this
>> available in Flink distro:
>>
>> public class ParallelSocketSource implements
>> ParallelSourceFunction {
>>
>> private static final long serialVersionUID =
>>-271094428915640892L;
>> private static final Logger LOG =
>> LoggerFactory.getLogger(ParallelSocketSource.class);
>>
>> private volatile boolean running = true;
>> private String host;
>> private int port;
>>
>> public ParallelSocketSource(String host, int port) {
>> this.host = host;
>> this.port = port;
>> }
>>
>> @Override
>> public void run(SourceContext ctx) throws Exception {
>> try (Socket socket = new Socket(host, port);
>> BufferedReader reader = new BufferedReader(new
>> InputStreamReader(socket.getInputStream( {
>> String line  = null;
>> while(running && ((line = reader.readLine()) !=
>> null)) {
>> ctx.collect(line);
>> }
>> } catch(IOException ex) {
>> LOG.error("error reading from socket", ex);
>> }
>> }
>>
>> @Override
>> public void cancel() {
>> running = false;
>> }
>> }
>>
>> Regards,
>> Ali
>>
>>
>> On 2015-12-08, 3:35 PM, "Stephan Ewen"  wrote:
>>
>> >Hi Ali!
>> >
>> >In the case you have, the sequence of source-map-filter ... forms a
>> >pipeline.
>> >
>> >You mentioned that you set the parallelism to 16, so there should be 16
>> >pipelines. These pipelines should be completely independent.
>> >
>> >Looking at the way the scheduler is implemented, independent pipelines
>> >should be spread across machines. But when you execute that in
>>parallel,
>> >you say all 16 pipelines end up on the same machine?
>> >
>> >Can you share with us the rough code of your program? Or a Screenshot
>>from
>> >the runtime dashboard that shows the program graph?
>> >
>> >
>> >If your cluster is basically for that one job only, you could try and
>>set
>> >the number of slots to 4 for each machine. Then you have 16 slots in
>>total
>> >and each node would run one of the 16 pipelines.
>> >
>> >
>> >Greetings,
>> >Stephan
>> >
>> >
>> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali 
>>wrote:
>> >
>> >> There is no shuffle operation in my flow. Mine actually looks like
>>this:
>> >>
>> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map ->
>>Map
>> >>->
>> >> Map, Filter)
>> >>
>> >>
>> >> Maybe it’s treating this whole flow as one pipeline and assigning it
>>to
>> >>a
>> >> slot. What I really wanted was to have the custom source I built to
>>have
>> >> running instances on all nodes. I’m not really sure if that’s the
>>right
>> >> approach, but if we could add this as a feature that’d be great,
>>since
>> >> having more than one node running the same pipeline guarantees the
>> >> pipeline is never offline.
>> >>
>> >> -Ali
>> >>
>> >> On 2015-12-02, 4:39 AM, "Till Rohrmann"  wrote:
>> >>
>> >> >If I'm not mistaken, then the scheduler has already a preference to
>> >>spread
>> >> >independent pipelines out across the cluster. At least he uses a
>>queue
>> 

[jira] [Created] (FLINK-3156) FlinkKafkaConsumer fails with NPE on notifyCheckpointComplete

2015-12-09 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3156:


 Summary: FlinkKafkaConsumer fails with NPE on 
notifyCheckpointComplete
 Key: FLINK-3156
 URL: https://issues.apache.org/jira/browse/FLINK-3156
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.0.0
Reporter: Till Rohrmann
Assignee: Robert Metzger


The {{FlinkKafkaConsumer}} fails with a {{NullPointerException}} when a 
checkpoint was completed. The stack trace is

{code}
 java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:908)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.commitOffsets(FlinkKafkaConsumer.java:664)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:578)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:546)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:904)
... 5 more
{code}

Apparently, one of the {{KafkaTopicPartitionLeaders}} from the 
{{subscribedPartitions}} was {{null}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: flink-dist packaging including unshaded classes

2015-12-09 Thread Nick Dimiduk
I did not. All I did was apply the PR from FLINK-3147. I thought perhaps
there's some command line incantation I'm missing.

On Wed, Dec 9, 2015 at 3:29 AM, Stephan Ewen  wrote:

> Hi!
>
> Did you change anything in the POM files, with respect to Guava, or add
> another dependency that might transitively pull Guava?
>
> Stephan
>
>
> On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk  wrote:
>
> > Hi there,
> >
> > I'm attempting to build locally a flink based on release-0.10.0 +
> > FLINK-3147. When I build from this sandbox, the resulting flink-dist.jar
> > contains both shanded and unshaded jars. In my case, this results in a
> > runtime conflict in my application, where
> com.google.common.base.Stopwatch
> > from both Guava-12 and Guava-18 are in my classpath.
> >
> > Is there some additional profile required to build a dist package with
> only
> > the shaded jars?
> >
> > Thanks,
> > Nick
> >
> > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz
> > $ cd flink-0.10.0
> > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch
> > testing:
> > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class   OK
> > testing:
> org/apache/flink/shaded/com/google/common/base/Stopwatch.class
> >   OK
> > testing:
> >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> >   OK
> > testing:
> >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> >   OK
> > testing:
> > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
> >  OK
> > testing:
> > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class
>  OK
> > testing: com/google/inject/internal/util/$Stopwatch.class   OK
> >
> > vs.
> >
> > $ git status
> > HEAD detached from release-0.10.0
> > $ git log --decorate=short --oneline | head -n3
> > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose fields
> as
> > protected
> > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8,
> > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0
> > c0fe305 [FLINK-2992] Remove use of SerializationUtils
> > $ mvn clean install -DskipTests
> > ...
> > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0
> > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch
> > testing:
> > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class   OK
> > testing:
> org/apache/flink/shaded/com/google/common/base/Stopwatch.class
> >   OK
> > testing:
> >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> >   OK
> > testing:
> >
> >
> org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
> >   OK
> > testing:
> > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class
> >  OK
> > testing:
> > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class
>  OK
> > testing: com/google/inject/internal/util/$Stopwatch.class   OK
> > testing: com/google/common/base/Stopwatch$1.class   OK
> > testing: com/google/common/base/Stopwatch.class   OK
> > testing:
> > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class
> >  OK
> > testing:
> > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class
>  OK
> >
>


Re: Task Parallelism in a Cluster

2015-12-09 Thread Stephan Ewen
Hi!

The parallel socket source looks good.
I think you forgot to attach the screenshot, or the mailing list dropped
the attachment...

Not sure if I can diagnose that without more details. The sources all do
the same. Assuming that the server distributes the data evenly across all
connected sockets, and that the network bandwidth ends up being divided in
a fair way, all pipelines should run be similarly "eager".

Greetings,
Stephan


On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali  wrote:

> Hi Stephan,
>
> That was my original understanding, until I realized that I was not using
> a parallel socket source. I had a custom source that extended
> SourceFunction which always runs with parallelism = 1. I looked through
> the API and found the ParallelSourceFunction interface so I implemented
> that and voila, now all 3 nodes in the cluster are actually receiving
> traffic on socket connections.
>
> Now that I’m running it successfully end to end, I’m trying to improve the
> performance. Can you take a look at the attached screen shot and tell me
> if the distribution of work amongst the pipelines is normal? I feel like
> some pipelines are lot lazier than others, even though the cluster nodes
> are exactly the same.
>
> By the way, here’s the class I wrote. It would be useful to have this
> available in Flink distro:
>
> public class ParallelSocketSource implements
> ParallelSourceFunction {
>
> private static final long serialVersionUID = -271094428915640892L;
> private static final Logger LOG =
> LoggerFactory.getLogger(ParallelSocketSource.class);
>
> private volatile boolean running = true;
> private String host;
> private int port;
>
> public ParallelSocketSource(String host, int port) {
> this.host = host;
> this.port = port;
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> try (Socket socket = new Socket(host, port);
> BufferedReader reader = new BufferedReader(new
> InputStreamReader(socket.getInputStream( {
> String line  = null;
> while(running && ((line = reader.readLine()) !=
> null)) {
> ctx.collect(line);
> }
> } catch(IOException ex) {
> LOG.error("error reading from socket", ex);
> }
> }
>
> @Override
> public void cancel() {
> running = false;
> }
> }
>
> Regards,
> Ali
>
>
> On 2015-12-08, 3:35 PM, "Stephan Ewen"  wrote:
>
> >Hi Ali!
> >
> >In the case you have, the sequence of source-map-filter ... forms a
> >pipeline.
> >
> >You mentioned that you set the parallelism to 16, so there should be 16
> >pipelines. These pipelines should be completely independent.
> >
> >Looking at the way the scheduler is implemented, independent pipelines
> >should be spread across machines. But when you execute that in parallel,
> >you say all 16 pipelines end up on the same machine?
> >
> >Can you share with us the rough code of your program? Or a Screenshot from
> >the runtime dashboard that shows the program graph?
> >
> >
> >If your cluster is basically for that one job only, you could try and set
> >the number of slots to 4 for each machine. Then you have 16 slots in total
> >and each node would run one of the 16 pipelines.
> >
> >
> >Greetings,
> >Stephan
> >
> >
> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali  wrote:
> >
> >> There is no shuffle operation in my flow. Mine actually looks like this:
> >>
> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map
> >>->
> >> Map, Filter)
> >>
> >>
> >> Maybe it’s treating this whole flow as one pipeline and assigning it to
> >>a
> >> slot. What I really wanted was to have the custom source I built to have
> >> running instances on all nodes. I’m not really sure if that’s the right
> >> approach, but if we could add this as a feature that’d be great, since
> >> having more than one node running the same pipeline guarantees the
> >> pipeline is never offline.
> >>
> >> -Ali
> >>
> >> On 2015-12-02, 4:39 AM, "Till Rohrmann"  wrote:
> >>
> >> >If I'm not mistaken, then the scheduler has already a preference to
> >>spread
> >> >independent pipelines out across the cluster. At least he uses a queue
> >>of
> >> >instances from which it pops the first element if it allocates a new
> >>slot.
> >> >This instance is then appended to the queue again, if it has some
> >> >resources
> >> >(slots) left.
> >> >
> >> >I would assume that you have a shuffle operation involved in your job
> >>such
> >> >that it makes sense for the scheduler to deploy all pipelines to the
> >>same
> >> >machine.
> >> >
> >> >Cheers,
> >> >Till
> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen"  wrote:
> >> >