[jira] [Created] (FLINK-3151) YARN kills Flink TM containers due to memory overuse (outside heap/offheap)
Robert Metzger created FLINK-3151: - Summary: YARN kills Flink TM containers due to memory overuse (outside heap/offheap) Key: FLINK-3151 URL: https://issues.apache.org/jira/browse/FLINK-3151 Project: Flink Issue Type: Bug Components: TaskManager Affects Versions: 0.10.1, 1.0.0 Environment: 48 cores Reporter: Robert Metzger Priority: Blocker A Flink user who's running Flink on YARN with 1 processing slot, 2 GB of TM memory on a machine with 48 reported CPU cores is running into issues with TM containers being killed due to memory overuse. Setting the YARN memory cutoff to 0.5 resolves the problem, but its not really a feasible approach. Another solution to the problem was downgrading netty again from 4.0.31.Final to 4.0.27.Final resolved the issue. We upgraded Netty between 0.9 and 0.10. Most likely netty changed its behavior between the releases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3152) Support all comparisons for Date type
Timo Walther created FLINK-3152: --- Summary: Support all comparisons for Date type Key: FLINK-3152 URL: https://issues.apache.org/jira/browse/FLINK-3152 Project: Flink Issue Type: Improvement Components: Table API Reporter: Timo Walther Currently, the Table API does not support comparisons like "DATE < DATE", "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Community choice for Hadoop Summit Europe 2016
Hi everyone, Just a reminder, the community vote for the Hadoop Summit Europe 2016 talks in Dublin is still open until December 15. There is a very good number of talks around Flink submitted, here are the ones that mention "flink" in their abstract: https://hadoopsummit.uservoice.com/search?filter=merged=flink Vote away :-) Best, Kostas
[jira] [Created] (FLINK-3153) Support all comparisons for String type
Timo Walther created FLINK-3153: --- Summary: Support all comparisons for String type Key: FLINK-3153 URL: https://issues.apache.org/jira/browse/FLINK-3153 Project: Flink Issue Type: Improvement Components: Table API Reporter: Timo Walther Currently, the Table API does not support comparisons like "STRING < STRING", "STRING >= STRING". The ExpressionCodeGenerator needs to be adapted for that. It might be necessary to define the order in the TableConfig as this maybe depends on the Locale. We should implement it according to SQL implementations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
New Blog Post Draft
Hi, after talking to several people and getting some feedback already, I would like to suggest a new blog post for the project web site about the Storm compatibility layer. You can find the draft here: https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md The missing (just not rendered) picture is this one: https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png Looking forward to your feedback! -Matthias signature.asc Description: OpenPGP digital signature
[jira] [Created] (FLINK-3150) Make YARN container invocation configurable
Robert Metzger created FLINK-3150: - Summary: Make YARN container invocation configurable Key: FLINK-3150 URL: https://issues.apache.org/jira/browse/FLINK-3150 Project: Flink Issue Type: Improvement Components: YARN Client Reporter: Robert Metzger Currently, the JVM invocation call of YARN containers is hardcoded. With this change, I would like to make the call configurable, using a string such as "%java% %memopts% %jvmopts% ..." Also, we should respect the {{java.env.home}} if its set. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3155) Update Flink docker version to latest stable Flink version
Maximilian Michels created FLINK-3155: - Summary: Update Flink docker version to latest stable Flink version Key: FLINK-3155 URL: https://issues.apache.org/jira/browse/FLINK-3155 Project: Flink Issue Type: Task Components: flink-contrib Affects Versions: 1.0.0 Reporter: Maximilian Michels Fix For: 1.0.0 It would be nice to always set the Docker Flink binary URL to point to the latest Flink version. Until then, this JIRA keeps track of the updates for releases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: New Blog Post Draft
Matthias, This is great blog! I would like to suggest the following: Change the title to: How to run your existing Storm applications on Apache Flink stream processing engine? Fixing the few typos For this reasons -> For these reasons Storm compatibility package which allows users -> Storm compatibility package that allows users we need to translated it -> we need to translate it in not available -> is not available eg, StormWordCount.jar -> e.g., StormWordCount.jar Provide some benchmarks on running a storm application as it is versus running it on Flink. Thanks Slim Baltagi On Dec 9, 2015, at 5:10 AM, Robert Metzgerwrote: > Great, thank you for writing the article. > > I like the general idea, but I've found some small typos. > Can you open a pull request against the "flink-web" repo to make reviewing > it easier? > > On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax wrote: > >> Hi, >> >> after talking to several people and getting some feedback already, I >> would like to suggest a new blog post for the project web site about the >> Storm compatibility layer. >> >> You can find the draft here: >> >> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md >> >> The missing (just not rendered) picture is this one: >> >> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png >> >> Looking forward to your feedback! >> >> >> -Matthias >> >>
[jira] [Created] (FLINK-3154) Update Kryo version from 2.24.0 to 3.0.3
Maximilian Michels created FLINK-3154: - Summary: Update Kryo version from 2.24.0 to 3.0.3 Key: FLINK-3154 URL: https://issues.apache.org/jira/browse/FLINK-3154 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 1.0.0 Reporter: Maximilian Michels Priority: Minor Fix For: 1.0.0 Flink's Kryo version is outdated and could be updated to a newer version, e.g. kryo-3.0.3. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: flink-dist packaging including unshaded classes
Hi! Did you change anything in the POM files, with respect to Guava, or add another dependency that might transitively pull Guava? Stephan On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimidukwrote: > Hi there, > > I'm attempting to build locally a flink based on release-0.10.0 + > FLINK-3147. When I build from this sandbox, the resulting flink-dist.jar > contains both shanded and unshaded jars. In my case, this results in a > runtime conflict in my application, where com.google.common.base.Stopwatch > from both Guava-12 and Guava-18 are in my classpath. > > Is there some additional profile required to build a dist package with only > the shaded jars? > > Thanks, > Nick > > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz > $ cd flink-0.10.0 > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch > testing: > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class OK > testing: org/apache/flink/shaded/com/google/common/base/Stopwatch.class > OK > testing: > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > OK > testing: > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > OK > testing: > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > OK > testing: > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class OK > testing: com/google/inject/internal/util/$Stopwatch.class OK > > vs. > > $ git status > HEAD detached from release-0.10.0 > $ git log --decorate=short --oneline | head -n3 > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose fields as > protected > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8, > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0 > c0fe305 [FLINK-2992] Remove use of SerializationUtils > $ mvn clean install -DskipTests > ... > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0 > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch > testing: > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class OK > testing: org/apache/flink/shaded/com/google/common/base/Stopwatch.class > OK > testing: > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > OK > testing: > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > OK > testing: > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > OK > testing: > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class OK > testing: com/google/inject/internal/util/$Stopwatch.class OK > testing: com/google/common/base/Stopwatch$1.class OK > testing: com/google/common/base/Stopwatch.class OK > testing: > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > OK > testing: > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class OK >
Re: New Blog Post Draft
Hi Matthias, Thank you for the blog post. You had already shared a first draft with me. This one looks even better! I've made some minor comments. +1 to merge if these are addressed. Cheers, Max On Wed, Dec 9, 2015 at 1:20 PM, Matthias J. Saxwrote: > Just updated the draft (thanks to Till and Slim for feedback) and opened > a PR. > > https://github.com/apache/flink-web/pull/15 > > @Slim: we discussed about benchmark result beforehand and decided to do > a second blog post later on > > > -Matthias > > On 12/09/2015 12:14 PM, Slim Baltagi wrote: >> Matthias, >> >> This is great blog! >> >> I would like to suggest the following: >> Change the title to: How to run your existing Storm applications on Apache >> Flink stream processing engine? >> Fixing the few typos >> For this reasons -> For these reasons >> Storm compatibility package which allows users -> Storm compatibility >> package that allows users >> we need to translated it -> we need to translate it >> in not available -> is not available >> eg, StormWordCount.jar -> e.g., StormWordCount.jar >> Provide some benchmarks on running a storm application as it is versus >> running it on Flink. >> Thanks >> >> Slim Baltagi >> >> On Dec 9, 2015, at 5:10 AM, Robert Metzger wrote: >> >>> Great, thank you for writing the article. >>> >>> I like the general idea, but I've found some small typos. >>> Can you open a pull request against the "flink-web" repo to make reviewing >>> it easier? >>> >>> On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax wrote: >>> Hi, after talking to several people and getting some feedback already, I would like to suggest a new blog post for the project web site about the Storm compatibility layer. You can find the draft here: https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md The missing (just not rendered) picture is this one: https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png Looking forward to your feedback! -Matthias >> >> >
Re: New Blog Post Draft
Great, thank you for writing the article. I like the general idea, but I've found some small typos. Can you open a pull request against the "flink-web" repo to make reviewing it easier? On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Saxwrote: > Hi, > > after talking to several people and getting some feedback already, I > would like to suggest a new blog post for the project web site about the > Storm compatibility layer. > > You can find the draft here: > > https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md > > The missing (just not rendered) picture is this one: > > https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png > > Looking forward to your feedback! > > > -Matthias > >
Re: New Blog Post Draft
Just updated the draft (thanks to Till and Slim for feedback) and opened a PR. https://github.com/apache/flink-web/pull/15 @Slim: we discussed about benchmark result beforehand and decided to do a second blog post later on -Matthias On 12/09/2015 12:14 PM, Slim Baltagi wrote: > Matthias, > > This is great blog! > > I would like to suggest the following: > Change the title to: How to run your existing Storm applications on Apache > Flink stream processing engine? > Fixing the few typos > For this reasons -> For these reasons > Storm compatibility package which allows users -> Storm compatibility package > that allows users > we need to translated it -> we need to translate it > in not available -> is not available > eg, StormWordCount.jar -> e.g., StormWordCount.jar > Provide some benchmarks on running a storm application as it is versus > running it on Flink. > Thanks > > Slim Baltagi > > On Dec 9, 2015, at 5:10 AM, Robert Metzgerwrote: > >> Great, thank you for writing the article. >> >> I like the general idea, but I've found some small typos. >> Can you open a pull request against the "flink-web" repo to make reviewing >> it easier? >> >> On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax wrote: >> >>> Hi, >>> >>> after talking to several people and getting some feedback already, I >>> would like to suggest a new blog post for the project web site about the >>> Storm compatibility layer. >>> >>> You can find the draft here: >>> >>> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md >>> >>> The missing (just not rendered) picture is this one: >>> >>> https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png >>> >>> Looking forward to your feedback! >>> >>> >>> -Matthias >>> >>> > > signature.asc Description: OpenPGP digital signature
Re: Task Parallelism in a Cluster
Hi Stephan, That was my original understanding, until I realized that I was not using a parallel socket source. I had a custom source that extended SourceFunction which always runs with parallelism = 1. I looked through the API and found the ParallelSourceFunction interface so I implemented that and voila, now all 3 nodes in the cluster are actually receiving traffic on socket connections. Now that I’m running it successfully end to end, I’m trying to improve the performance. Can you take a look at the attached screen shot and tell me if the distribution of work amongst the pipelines is normal? I feel like some pipelines are lot lazier than others, even though the cluster nodes are exactly the same. By the way, here’s the class I wrote. It would be useful to have this available in Flink distro: public class ParallelSocketSource implements ParallelSourceFunction { private static final long serialVersionUID = -271094428915640892L; private static final Logger LOG = LoggerFactory.getLogger(ParallelSocketSource.class); private volatile boolean running = true; private String host; private int port; public ParallelSocketSource(String host, int port) { this.host = host; this.port = port; } @Override public void run(SourceContext ctx) throws Exception { try (Socket socket = new Socket(host, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream( { String line = null; while(running && ((line = reader.readLine()) != null)) { ctx.collect(line); } } catch(IOException ex) { LOG.error("error reading from socket", ex); } } @Override public void cancel() { running = false; } } Regards, Ali On 2015-12-08, 3:35 PM, "Stephan Ewen"wrote: >Hi Ali! > >In the case you have, the sequence of source-map-filter ... forms a >pipeline. > >You mentioned that you set the parallelism to 16, so there should be 16 >pipelines. These pipelines should be completely independent. > >Looking at the way the scheduler is implemented, independent pipelines >should be spread across machines. But when you execute that in parallel, >you say all 16 pipelines end up on the same machine? > >Can you share with us the rough code of your program? Or a Screenshot from >the runtime dashboard that shows the program graph? > > >If your cluster is basically for that one job only, you could try and set >the number of slots to 4 for each machine. Then you have 16 slots in total >and each node would run one of the 16 pipelines. > > >Greetings, >Stephan > > >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali wrote: > >> There is no shuffle operation in my flow. Mine actually looks like this: >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map >>-> >> Map, Filter) >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning it to >>a >> slot. What I really wanted was to have the custom source I built to have >> running instances on all nodes. I’m not really sure if that’s the right >> approach, but if we could add this as a feature that’d be great, since >> having more than one node running the same pipeline guarantees the >> pipeline is never offline. >> >> -Ali >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" wrote: >> >> >If I'm not mistaken, then the scheduler has already a preference to >>spread >> >independent pipelines out across the cluster. At least he uses a queue >>of >> >instances from which it pops the first element if it allocates a new >>slot. >> >This instance is then appended to the queue again, if it has some >> >resources >> >(slots) left. >> > >> >I would assume that you have a shuffle operation involved in your job >>such >> >that it makes sense for the scheduler to deploy all pipelines to the >>same >> >machine. >> > >> >Cheers, >> >Till >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" wrote: >> > >> >> Slots are like "resource groups" which execute entire pipelines. They >> >> frequently have more than one operator. >> >> >> >> What you can try as a workaround is decrease the number of slots per >> >> machine to cause the operators to be spread across more machines. >> >> >> >> If this is a crucial issue for your use case, it should be simple to >> >>add a >> >> "preference to spread out" to the scheduler... >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali >> >>wrote: >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make >>sure >> >>the >> >> > parallel instances of the task are distributed across the cluster. >> >>When I >> >> > run my flink job with a parallelism of 16, all the
Re: New Blog Post Draft
Thanks Matthias! This is a very nice blog post and reads easily. On 9 December 2015 at 19:21, Ufuk Celebiwrote: > Great post! Thanks! > > I have also made some comments in the commit. > > – Ufuk > > > On 09 Dec 2015, at 14:19, Maximilian Michels wrote: > > > > Hi Matthias, > > > > Thank you for the blog post. You had already shared a first draft with > > me. This one looks even better! > > > > I've made some minor comments. +1 to merge if these are addressed. > > > > Cheers, > > Max > > > > On Wed, Dec 9, 2015 at 1:20 PM, Matthias J. Sax > wrote: > >> Just updated the draft (thanks to Till and Slim for feedback) and opened > >> a PR. > >> > >> https://github.com/apache/flink-web/pull/15 > >> > >> @Slim: we discussed about benchmark result beforehand and decided to do > >> a second blog post later on > >> > >> > >> -Matthias > >> > >> On 12/09/2015 12:14 PM, Slim Baltagi wrote: > >>> Matthias, > >>> > >>> This is great blog! > >>> > >>> I would like to suggest the following: > >>> Change the title to: How to run your existing Storm applications on > Apache Flink stream processing engine? > >>> Fixing the few typos > >>> For this reasons -> For these reasons > >>> Storm compatibility package which allows users -> Storm compatibility > package that allows users > >>> we need to translated it -> we need to translate it > >>> in not available -> is not available > >>> eg, StormWordCount.jar -> e.g., StormWordCount.jar > >>> Provide some benchmarks on running a storm application as it is versus > running it on Flink. > >>> Thanks > >>> > >>> Slim Baltagi > >>> > >>> On Dec 9, 2015, at 5:10 AM, Robert Metzger > wrote: > >>> > Great, thank you for writing the article. > > I like the general idea, but I've found some small typos. > Can you open a pull request against the "flink-web" repo to make > reviewing > it easier? > > On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax > wrote: > > > Hi, > > > > after talking to several people and getting some feedback already, I > > would like to suggest a new blog post for the project web site about > the > > Storm compatibility layer. > > > > You can find the draft here: > > > > > https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md > > > > The missing (just not rendered) picture is this one: > > > > > https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png > > > > Looking forward to your feedback! > > > > > > -Matthias > > > > > >>> > >>> > >> > >
Re: New Blog Post Draft
Great post! Thanks! I have also made some comments in the commit. – Ufuk > On 09 Dec 2015, at 14:19, Maximilian Michelswrote: > > Hi Matthias, > > Thank you for the blog post. You had already shared a first draft with > me. This one looks even better! > > I've made some minor comments. +1 to merge if these are addressed. > > Cheers, > Max > > On Wed, Dec 9, 2015 at 1:20 PM, Matthias J. Sax wrote: >> Just updated the draft (thanks to Till and Slim for feedback) and opened >> a PR. >> >> https://github.com/apache/flink-web/pull/15 >> >> @Slim: we discussed about benchmark result beforehand and decided to do >> a second blog post later on >> >> >> -Matthias >> >> On 12/09/2015 12:14 PM, Slim Baltagi wrote: >>> Matthias, >>> >>> This is great blog! >>> >>> I would like to suggest the following: >>> Change the title to: How to run your existing Storm applications on Apache >>> Flink stream processing engine? >>> Fixing the few typos >>> For this reasons -> For these reasons >>> Storm compatibility package which allows users -> Storm compatibility >>> package that allows users >>> we need to translated it -> we need to translate it >>> in not available -> is not available >>> eg, StormWordCount.jar -> e.g., StormWordCount.jar >>> Provide some benchmarks on running a storm application as it is versus >>> running it on Flink. >>> Thanks >>> >>> Slim Baltagi >>> >>> On Dec 9, 2015, at 5:10 AM, Robert Metzger wrote: >>> Great, thank you for writing the article. I like the general idea, but I've found some small typos. Can you open a pull request against the "flink-web" repo to make reviewing it easier? On Wed, Dec 9, 2015 at 11:32 AM, Matthias J. Sax wrote: > Hi, > > after talking to several people and getting some feedback already, I > would like to suggest a new blog post for the project web site about the > Storm compatibility layer. > > You can find the draft here: > > https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/_posts/2015-12-07-storm-compatibility.md > > The missing (just not rendered) picture is this one: > > https://github.com/mjsax/flink-web/blob/stormCompatibilityBlogPost/img/blog/flink-storm.png > > Looking forward to your feedback! > > > -Matthias > > >>> >>> >>
Re: flink-dist packaging including unshaded classes
mvn dependency:tree from flink-dist module does not include any mention of guava. When I build (mvn clean package -DskipTests) vs master (fc8be1c) I see the same packaging problem. On Wed, Dec 9, 2015 at 9:29 AM, Stephan Ewenwrote: > Usually, no command line magic is needed. Simple "mvn clean package" does > it. > > May be that this is not related to your building at all. Can you check > whether the same is already the case on the master branch with Hadoop 2.7 > Scala 2.11 ? > > Also, simple way to diagnose where these dependencies come from is to do > inside the "flink-dist" project a "mvn dependency:tree" run. That shows how > the unshaded Guava was pulled in. > > Greetings, > Stephan > > > On Wed, Dec 9, 2015 at 6:22 PM, Nick Dimiduk wrote: > > > I did not. All I did was apply the PR from FLINK-3147. I thought perhaps > > there's some command line incantation I'm missing. > > > > On Wed, Dec 9, 2015 at 3:29 AM, Stephan Ewen wrote: > > > > > Hi! > > > > > > Did you change anything in the POM files, with respect to Guava, or add > > > another dependency that might transitively pull Guava? > > > > > > Stephan > > > > > > > > > On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk > > wrote: > > > > > > > Hi there, > > > > > > > > I'm attempting to build locally a flink based on release-0.10.0 + > > > > FLINK-3147. When I build from this sandbox, the resulting > > flink-dist.jar > > > > contains both shanded and unshaded jars. In my case, this results in > a > > > > runtime conflict in my application, where > > > com.google.common.base.Stopwatch > > > > from both Guava-12 and Guava-18 are in my classpath. > > > > > > > > Is there some additional profile required to build a dist package > with > > > only > > > > the shaded jars? > > > > > > > > Thanks, > > > > Nick > > > > > > > > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz > > > > $ cd flink-0.10.0 > > > > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch > > > > testing: > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class OK > > > > testing: > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class > > > > OK > > > > testing: > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > > > OK > > > > testing: > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > > > OK > > > > testing: > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > > > > OK > > > > testing: > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class > > > OK > > > > testing: com/google/inject/internal/util/$Stopwatch.class OK > > > > > > > > vs. > > > > > > > > $ git status > > > > HEAD detached from release-0.10.0 > > > > $ git log --decorate=short --oneline | head -n3 > > > > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose > fields > > > as > > > > protected > > > > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8, > > > > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0 > > > > c0fe305 [FLINK-2992] Remove use of SerializationUtils > > > > $ mvn clean install -DskipTests > > > > ... > > > > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0 > > > > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch > > > > testing: > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class OK > > > > testing: > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class > > > > OK > > > > testing: > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > > > OK > > > > testing: > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > > > OK > > > > testing: > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > > > > OK > > > > testing: > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class > > > OK > > > > testing: com/google/inject/internal/util/$Stopwatch.class OK > > > > testing: com/google/common/base/Stopwatch$1.class OK > > > > testing: com/google/common/base/Stopwatch.class OK > > > > testing: > > > > > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > > > OK > > > > testing: > > > > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > > OK > > > > > > > > > >
[jira] [Created] (FLINK-3157) Web frontend json files contain author attribution
Ufuk Celebi created FLINK-3157: -- Summary: Web frontend json files contain author attribution Key: FLINK-3157 URL: https://issues.apache.org/jira/browse/FLINK-3157 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 0.10.1 Reporter: Ufuk Celebi Priority: Trivial http://mail-archives.apache.org/mod_mbox/jakarta-jmeter-dev/200402.mbox/%3c4039f65e.7020...@atg.com%3E {quote} author tags are officially discouraged. these create difficulties in establishing the proper ownership and the protection of our committers. there are other social issues dealing with collaborative development, but the Board is concerned about the legal ramifications around the use of author tags {quote} Files: {{flink-runtime-web/web-dashboard/*.json}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: flink-dist packaging including unshaded classes
I can confirm that guava is part of the fat jar for the 2.7.0, scala 2.11 distribution. I'll look into the issue tomorrow On Wed, Dec 9, 2015 at 7:58 PM, Nick Dimidukwrote: > mvn dependency:tree from flink-dist module does not include any mention of > guava. When I build (mvn clean package -DskipTests) vs master (fc8be1c) I > see the same packaging problem. > > On Wed, Dec 9, 2015 at 9:29 AM, Stephan Ewen wrote: > > > Usually, no command line magic is needed. Simple "mvn clean package" does > > it. > > > > May be that this is not related to your building at all. Can you check > > whether the same is already the case on the master branch with Hadoop 2.7 > > Scala 2.11 ? > > > > Also, simple way to diagnose where these dependencies come from is to do > > inside the "flink-dist" project a "mvn dependency:tree" run. That shows > how > > the unshaded Guava was pulled in. > > > > Greetings, > > Stephan > > > > > > On Wed, Dec 9, 2015 at 6:22 PM, Nick Dimiduk wrote: > > > > > I did not. All I did was apply the PR from FLINK-3147. I thought > perhaps > > > there's some command line incantation I'm missing. > > > > > > On Wed, Dec 9, 2015 at 3:29 AM, Stephan Ewen wrote: > > > > > > > Hi! > > > > > > > > Did you change anything in the POM files, with respect to Guava, or > add > > > > another dependency that might transitively pull Guava? > > > > > > > > Stephan > > > > > > > > > > > > On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk > > > wrote: > > > > > > > > > Hi there, > > > > > > > > > > I'm attempting to build locally a flink based on release-0.10.0 + > > > > > FLINK-3147. When I build from this sandbox, the resulting > > > flink-dist.jar > > > > > contains both shanded and unshaded jars. In my case, this results > in > > a > > > > > runtime conflict in my application, where > > > > com.google.common.base.Stopwatch > > > > > from both Guava-12 and Guava-18 are in my classpath. > > > > > > > > > > Is there some additional profile required to build a dist package > > with > > > > only > > > > > the shaded jars? > > > > > > > > > > Thanks, > > > > > Nick > > > > > > > > > > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz > > > > > $ cd flink-0.10.0 > > > > > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch > > > > > testing: > > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class > OK > > > > > testing: > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class > > > > > OK > > > > > testing: > > > > > > > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > > > > OK > > > > > testing: > > > > > > > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > > > > OK > > > > > testing: > > > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > > > > > OK > > > > > testing: > > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class > > > > OK > > > > > testing: com/google/inject/internal/util/$Stopwatch.class OK > > > > > > > > > > vs. > > > > > > > > > > $ git status > > > > > HEAD detached from release-0.10.0 > > > > > $ git log --decorate=short --oneline | head -n3 > > > > > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose > > fields > > > > as > > > > > protected > > > > > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8, > > > > > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0 > > > > > c0fe305 [FLINK-2992] Remove use of SerializationUtils > > > > > $ mvn clean install -DskipTests > > > > > ... > > > > > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0 > > > > > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch > > > > > testing: > > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class > OK > > > > > testing: > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class > > > > > OK > > > > > testing: > > > > > > > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > > > > OK > > > > > testing: > > > > > > > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > > > > OK > > > > > testing: > > > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > > > > > OK > > > > > testing: > > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class > > > > OK > > > > > testing: com/google/inject/internal/util/$Stopwatch.class OK > > > > > testing: com/google/common/base/Stopwatch$1.class OK > > > > > testing: com/google/common/base/Stopwatch.class OK > > > > > testing: > > > > > > >
Re: flink-dist packaging including unshaded classes
Thanks, I appreciate it. On Wed, Dec 9, 2015 at 12:50 PM, Robert Metzgerwrote: > I can confirm that guava is part of the fat jar for the 2.7.0, scala 2.11 > distribution. > > I'll look into the issue tomorrow > > On Wed, Dec 9, 2015 at 7:58 PM, Nick Dimiduk wrote: > > > mvn dependency:tree from flink-dist module does not include any mention > of > > guava. When I build (mvn clean package -DskipTests) vs master (fc8be1c) I > > see the same packaging problem. > > > > On Wed, Dec 9, 2015 at 9:29 AM, Stephan Ewen wrote: > > > > > Usually, no command line magic is needed. Simple "mvn clean package" > does > > > it. > > > > > > May be that this is not related to your building at all. Can you check > > > whether the same is already the case on the master branch with Hadoop > 2.7 > > > Scala 2.11 ? > > > > > > Also, simple way to diagnose where these dependencies come from is to > do > > > inside the "flink-dist" project a "mvn dependency:tree" run. That shows > > how > > > the unshaded Guava was pulled in. > > > > > > Greetings, > > > Stephan > > > > > > > > > On Wed, Dec 9, 2015 at 6:22 PM, Nick Dimiduk > wrote: > > > > > > > I did not. All I did was apply the PR from FLINK-3147. I thought > > perhaps > > > > there's some command line incantation I'm missing. > > > > > > > > On Wed, Dec 9, 2015 at 3:29 AM, Stephan Ewen > wrote: > > > > > > > > > Hi! > > > > > > > > > > Did you change anything in the POM files, with respect to Guava, or > > add > > > > > another dependency that might transitively pull Guava? > > > > > > > > > > Stephan > > > > > > > > > > > > > > > On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk > > > > wrote: > > > > > > > > > > > Hi there, > > > > > > > > > > > > I'm attempting to build locally a flink based on release-0.10.0 + > > > > > > FLINK-3147. When I build from this sandbox, the resulting > > > > flink-dist.jar > > > > > > contains both shanded and unshaded jars. In my case, this results > > in > > > a > > > > > > runtime conflict in my application, where > > > > > com.google.common.base.Stopwatch > > > > > > from both Guava-12 and Guava-18 are in my classpath. > > > > > > > > > > > > Is there some additional profile required to build a dist package > > > with > > > > > only > > > > > > the shaded jars? > > > > > > > > > > > > Thanks, > > > > > > Nick > > > > > > > > > > > > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz > > > > > > $ cd flink-0.10.0 > > > > > > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch > > > > > > testing: > > > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class > > OK > > > > > > testing: > > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class > > > > > > OK > > > > > > testing: > > > > > > > > > > > > > > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > > > > > OK > > > > > > testing: > > > > > > > > > > > > > > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > > > > > OK > > > > > > testing: > > > > > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > > > > > > OK > > > > > > testing: > > > > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class > > > > > OK > > > > > > testing: com/google/inject/internal/util/$Stopwatch.class > OK > > > > > > > > > > > > vs. > > > > > > > > > > > > $ git status > > > > > > HEAD detached from release-0.10.0 > > > > > > $ git log --decorate=short --oneline | head -n3 > > > > > > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose > > > fields > > > > > as > > > > > > protected > > > > > > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8, > > > > > > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0 > > > > > > c0fe305 [FLINK-2992] Remove use of SerializationUtils > > > > > > $ mvn clean install -DskipTests > > > > > > ... > > > > > > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0 > > > > > > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch > > > > > > testing: > > > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class > > OK > > > > > > testing: > > > > > org/apache/flink/shaded/com/google/common/base/Stopwatch.class > > > > > > OK > > > > > > testing: > > > > > > > > > > > > > > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > > > > > OK > > > > > > testing: > > > > > > > > > > > > > > > > > > > > > > > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > > > > > OK > > > > > > testing: > > > > > > > > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > > > > > > OK > > > > > >
Re: Apache Tinkerpop & Geode Integration?
Hello squirrels, I have been discussing with the Apache Tinkerpop [1] community regarding an integration with Flink/Gelly. You can read our discussion in [2]. Tinkerpop has a graph traversal machine called Gremlin, which supports many high-level graph processing languages and runs on top of different systems (e.g. Giraph, Spark, Graph DBs). You can read more in this great blog post [3]. The idea is to provide a FlinkGraphComputer implementation, which will add Gremlin support to Flink. I believe Tinkerpop is a great project and I would love to see an integration with Gelly. Before we move forward, I would like your input! To me, it seems that this addition would nicely fit in flink-contrib, where we also have connectors to other projects. If you agree, I will go ahead and open a JIRA about it. Thank you! -Vasia. [1]: https://tinkerpop.incubator.apache.org/ [2]: https://mail-archives.apache.org/mod_mbox/incubator-tinkerpop-dev/201511.mbox/%3ccanva_a390l7g169r8sn+ej1-yfkbudlnd4td6atwnp0uza-...@mail.gmail.com%3E [3]: http://www.datastax.com/dev/blog/the-benefits-of-the-gremlin-graph-traversal-machine On 25 November 2015 at 16:54, Vasiliki Kalavriwrote: > Hi James, > > I've just subscribed to the Tinkerpop dev mailing list. Could you please > send a reply to the thread, so then I can reply to it? > I'm not sure how I can reply to the thread otherwise... > I also saw that there is a grafos.ml project thread. I could also provide > some input there :) > > Thanks! > -Vasia. > > On 25 November 2015 at 15:09, James Thornton > wrote: > >> Hi Vasia - >> >> Yes, a FlinkGraphComputer should be a straight-forward first step. Also, >> on >> the Apache Tinkerpop dev mailing list, Marko thought it might be cool if >> there was a "Graph API" similar to the "Table API" -- hooking in Gremlin >> to >> Flink's fluent API would give Flink users a full graph query language. >> >> Stephen Mallette is a TinkerPop core contributor, and he has already >> started working on a FlinkGraphComputer. There is a Flink/Tinkerpop thread >> on the TinkerPop dev list -- it would be great to have you part of the >> conversation there too as we work on the integration: >> >>http://mail-archives.apache.org/mod_mbox/incubator-tinkerpop-dev/ >> >> Thanks, Vasia. >> >> - James >> >> >> On Mon, Nov 23, 2015 at 10:28 AM, Vasiliki Kalavri < >> vasilikikala...@gmail.com> wrote: >> >> > Hi James, >> > >> > thank you for your e-mail and your interest in Flink :) >> > >> > I've recently taken a _quick_ look into Apache TinkerPop and I think >> it'd >> > be very interesting to integrate with Flink/Gelly. >> > Are you thinking about something like a Flink GraphComputer, similar to >> > Giraph and Spark GraphComputer's? >> > I believe such an integration should be straight-forward to implement. >> You >> > can start by looking into Flink iteration operators [1] and Gelly >> iteration >> > abstractions [2]. >> > >> > Regarding Apache Geode, I'm not familiar with project, but I'll try to >> take >> > a look in the following days! >> > >> > Cheers, >> > -Vasia. >> > >> > >> > [1]: >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#iteration-operators >> > [2]: >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-master/libs/gelly_guide.html#iterative-graph-processing >> > >> > >> > On 20 November 2015 at 08:32, James Thornton >> > wrote: >> > >> > > Hi - >> > > >> > > This is James Thornton (espeed) from the Apache Tinkerpop project ( >> > > http://tinkerpop.incubator.apache.org/). >> > > >> > > The Flink iterators should pair well with Gremlin's Graph Traversal >> > Machine >> > > ( >> > > >> > > >> > >> http://www.datastax.com/dev/blog/the-benefits-of-the-gremlin-graph-traversal-machine >> > > ) >> > > -- it would be good to coordinate on creating an integration. >> > > >> > > Also, Apache Geode made a splash today on HN ( >> > > https://news.ycombinator.com/item?id=10596859) -- connecting Geode >> and >> > > Flink would be killer. Here's the Geode/Spark connector for >> refefference: >> > > >> > > >> > > >> > > >> > >> https://github.com/apache/incubator-geode/tree/develop/gemfire-spark-connector >> > > >> > > - James >> > > >> > >> > >
Re: Task Parallelism in a Cluster
Hi Stephan, Here’s a link to the screenshot I tried to attach earlier: https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 It looks to me like the distribution is fairly skewed across the nodes, even though they’re executing the same pipeline. Thanks, Ali On 2015-12-09, 12:36 PM, "Stephan Ewen"wrote: >Hi! > >The parallel socket source looks good. >I think you forgot to attach the screenshot, or the mailing list dropped >the attachment... > >Not sure if I can diagnose that without more details. The sources all do >the same. Assuming that the server distributes the data evenly across all >connected sockets, and that the network bandwidth ends up being divided in >a fair way, all pipelines should run be similarly "eager". > >Greetings, >Stephan > > >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali wrote: > >> Hi Stephan, >> >> That was my original understanding, until I realized that I was not >>using >> a parallel socket source. I had a custom source that extended >> SourceFunction which always runs with parallelism = 1. I looked through >> the API and found the ParallelSourceFunction interface so I implemented >> that and voila, now all 3 nodes in the cluster are actually receiving >> traffic on socket connections. >> >> Now that I’m running it successfully end to end, I’m trying to improve >>the >> performance. Can you take a look at the attached screen shot and tell me >> if the distribution of work amongst the pipelines is normal? I feel like >> some pipelines are lot lazier than others, even though the cluster nodes >> are exactly the same. >> >> By the way, here’s the class I wrote. It would be useful to have this >> available in Flink distro: >> >> public class ParallelSocketSource implements >> ParallelSourceFunction { >> >> private static final long serialVersionUID = >>-271094428915640892L; >> private static final Logger LOG = >> LoggerFactory.getLogger(ParallelSocketSource.class); >> >> private volatile boolean running = true; >> private String host; >> private int port; >> >> public ParallelSocketSource(String host, int port) { >> this.host = host; >> this.port = port; >> } >> >> @Override >> public void run(SourceContext ctx) throws Exception { >> try (Socket socket = new Socket(host, port); >> BufferedReader reader = new BufferedReader(new >> InputStreamReader(socket.getInputStream( { >> String line = null; >> while(running && ((line = reader.readLine()) != >> null)) { >> ctx.collect(line); >> } >> } catch(IOException ex) { >> LOG.error("error reading from socket", ex); >> } >> } >> >> @Override >> public void cancel() { >> running = false; >> } >> } >> >> Regards, >> Ali >> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" wrote: >> >> >Hi Ali! >> > >> >In the case you have, the sequence of source-map-filter ... forms a >> >pipeline. >> > >> >You mentioned that you set the parallelism to 16, so there should be 16 >> >pipelines. These pipelines should be completely independent. >> > >> >Looking at the way the scheduler is implemented, independent pipelines >> >should be spread across machines. But when you execute that in >>parallel, >> >you say all 16 pipelines end up on the same machine? >> > >> >Can you share with us the rough code of your program? Or a Screenshot >>from >> >the runtime dashboard that shows the program graph? >> > >> > >> >If your cluster is basically for that one job only, you could try and >>set >> >the number of slots to 4 for each machine. Then you have 16 slots in >>total >> >and each node would run one of the 16 pipelines. >> > >> > >> >Greetings, >> >Stephan >> > >> > >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali >>wrote: >> > >> >> There is no shuffle operation in my flow. Mine actually looks like >>this: >> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> >>Map >> >>-> >> >> Map, Filter) >> >> >> >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning it >>to >> >>a >> >> slot. What I really wanted was to have the custom source I built to >>have >> >> running instances on all nodes. I’m not really sure if that’s the >>right >> >> approach, but if we could add this as a feature that’d be great, >>since >> >> having more than one node running the same pipeline guarantees the >> >> pipeline is never offline. >> >> >> >> -Ali >> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" wrote: >> >> >> >> >If I'm not mistaken, then the scheduler has already a preference to >> >>spread >> >> >independent pipelines out across the cluster. At least he uses a >>queue >>
[jira] [Created] (FLINK-3156) FlinkKafkaConsumer fails with NPE on notifyCheckpointComplete
Till Rohrmann created FLINK-3156: Summary: FlinkKafkaConsumer fails with NPE on notifyCheckpointComplete Key: FLINK-3156 URL: https://issues.apache.org/jira/browse/FLINK-3156 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.0.0 Reporter: Till Rohrmann Assignee: Robert Metzger The {{FlinkKafkaConsumer}} fails with a {{NullPointerException}} when a checkpoint was completed. The stack trace is {code} java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:908) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.commitOffsets(FlinkKafkaConsumer.java:664) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.notifyCheckpointComplete(FlinkKafkaConsumer.java:578) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:176) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:546) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:904) ... 5 more {code} Apparently, one of the {{KafkaTopicPartitionLeaders}} from the {{subscribedPartitions}} was {{null}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: flink-dist packaging including unshaded classes
I did not. All I did was apply the PR from FLINK-3147. I thought perhaps there's some command line incantation I'm missing. On Wed, Dec 9, 2015 at 3:29 AM, Stephan Ewenwrote: > Hi! > > Did you change anything in the POM files, with respect to Guava, or add > another dependency that might transitively pull Guava? > > Stephan > > > On Tue, Dec 8, 2015 at 9:25 PM, Nick Dimiduk wrote: > > > Hi there, > > > > I'm attempting to build locally a flink based on release-0.10.0 + > > FLINK-3147. When I build from this sandbox, the resulting flink-dist.jar > > contains both shanded and unshaded jars. In my case, this results in a > > runtime conflict in my application, where > com.google.common.base.Stopwatch > > from both Guava-12 and Guava-18 are in my classpath. > > > > Is there some additional profile required to build a dist package with > only > > the shaded jars? > > > > Thanks, > > Nick > > > > $ tar xvzf flink-0.10.0-bin-hadoop27-scala_2.11.tgz > > $ cd flink-0.10.0 > > $ $ unzip -t lib/flink-dist_2.11-0.10.0.jar | grep Stopwatch > > testing: > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class OK > > testing: > org/apache/flink/shaded/com/google/common/base/Stopwatch.class > > OK > > testing: > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > OK > > testing: > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > OK > > testing: > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > > OK > > testing: > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class > OK > > testing: com/google/inject/internal/util/$Stopwatch.class OK > > > > vs. > > > > $ git status > > HEAD detached from release-0.10.0 > > $ git log --decorate=short --oneline | head -n3 > > dccdbd8 (HEAD) [FLINK-3147] HadoopOutputFormatBase should expose fields > as > > protected > > ab2cca4 (tag: release-0.10.0, origin/release-0.10.0-rc8, > > ndimiduk/release-0.10.0-rc8) Commit for release 0.10.0 > > c0fe305 [FLINK-2992] Remove use of SerializationUtils > > $ mvn clean install -DskipTests > > ... > > $ cd flink-dist/target/flink-0.10.0-bin/flink-0.10.0 > > $ unzip -t lib/flink-dist-0.10.0.jar | grep Stopwatch > > testing: > > org/apache/flink/shaded/com/google/common/base/Stopwatch$1.class OK > > testing: > org/apache/flink/shaded/com/google/common/base/Stopwatch.class > > OK > > testing: > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > OK > > testing: > > > > > org/apache/flink/shaded/com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > > OK > > testing: > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch$1.class > > OK > > testing: > > org/apache/flink/hadoop/shaded/com/google/common/base/Stopwatch.class > OK > > testing: com/google/inject/internal/util/$Stopwatch.class OK > > testing: com/google/common/base/Stopwatch$1.class OK > > testing: com/google/common/base/Stopwatch.class OK > > testing: > > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch$1.class > > OK > > testing: > > com/google/common/util/concurrent/RateLimiter$SleepingStopwatch.class > OK > > >
Re: Task Parallelism in a Cluster
Hi! The parallel socket source looks good. I think you forgot to attach the screenshot, or the mailing list dropped the attachment... Not sure if I can diagnose that without more details. The sources all do the same. Assuming that the server distributes the data evenly across all connected sockets, and that the network bandwidth ends up being divided in a fair way, all pipelines should run be similarly "eager". Greetings, Stephan On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Aliwrote: > Hi Stephan, > > That was my original understanding, until I realized that I was not using > a parallel socket source. I had a custom source that extended > SourceFunction which always runs with parallelism = 1. I looked through > the API and found the ParallelSourceFunction interface so I implemented > that and voila, now all 3 nodes in the cluster are actually receiving > traffic on socket connections. > > Now that I’m running it successfully end to end, I’m trying to improve the > performance. Can you take a look at the attached screen shot and tell me > if the distribution of work amongst the pipelines is normal? I feel like > some pipelines are lot lazier than others, even though the cluster nodes > are exactly the same. > > By the way, here’s the class I wrote. It would be useful to have this > available in Flink distro: > > public class ParallelSocketSource implements > ParallelSourceFunction { > > private static final long serialVersionUID = -271094428915640892L; > private static final Logger LOG = > LoggerFactory.getLogger(ParallelSocketSource.class); > > private volatile boolean running = true; > private String host; > private int port; > > public ParallelSocketSource(String host, int port) { > this.host = host; > this.port = port; > } > > @Override > public void run(SourceContext ctx) throws Exception { > try (Socket socket = new Socket(host, port); > BufferedReader reader = new BufferedReader(new > InputStreamReader(socket.getInputStream( { > String line = null; > while(running && ((line = reader.readLine()) != > null)) { > ctx.collect(line); > } > } catch(IOException ex) { > LOG.error("error reading from socket", ex); > } > } > > @Override > public void cancel() { > running = false; > } > } > > Regards, > Ali > > > On 2015-12-08, 3:35 PM, "Stephan Ewen" wrote: > > >Hi Ali! > > > >In the case you have, the sequence of source-map-filter ... forms a > >pipeline. > > > >You mentioned that you set the parallelism to 16, so there should be 16 > >pipelines. These pipelines should be completely independent. > > > >Looking at the way the scheduler is implemented, independent pipelines > >should be spread across machines. But when you execute that in parallel, > >you say all 16 pipelines end up on the same machine? > > > >Can you share with us the rough code of your program? Or a Screenshot from > >the runtime dashboard that shows the program graph? > > > > > >If your cluster is basically for that one job only, you could try and set > >the number of slots to 4 for each machine. Then you have 16 slots in total > >and each node would run one of the 16 pipelines. > > > > > >Greetings, > >Stephan > > > > > >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali wrote: > > > >> There is no shuffle operation in my flow. Mine actually looks like this: > >> > >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map > >>-> > >> Map, Filter) > >> > >> > >> Maybe it’s treating this whole flow as one pipeline and assigning it to > >>a > >> slot. What I really wanted was to have the custom source I built to have > >> running instances on all nodes. I’m not really sure if that’s the right > >> approach, but if we could add this as a feature that’d be great, since > >> having more than one node running the same pipeline guarantees the > >> pipeline is never offline. > >> > >> -Ali > >> > >> On 2015-12-02, 4:39 AM, "Till Rohrmann" wrote: > >> > >> >If I'm not mistaken, then the scheduler has already a preference to > >>spread > >> >independent pipelines out across the cluster. At least he uses a queue > >>of > >> >instances from which it pops the first element if it allocates a new > >>slot. > >> >This instance is then appended to the queue again, if it has some > >> >resources > >> >(slots) left. > >> > > >> >I would assume that you have a shuffle operation involved in your job > >>such > >> >that it makes sense for the scheduler to deploy all pipelines to the > >>same > >> >machine. > >> > > >> >Cheers, > >> >Till > >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" wrote: > >> >