A trivial update on README

2018-04-27 Thread 陈梓立
Hi guys,

Recently I push a PR on apache/flink repo(https://github.com/
apache/flink/pull/5924), it's about a trivial update on README.md, raising
once I surprisingly failed to build using Java 9.

It is good that someone just tell me that it is meaningless so that I could
close it, but no replies, which makes me feel so sad.

This is my first PR on Flink, and I think it might be a good start to me
that a PR is accepted so that I gain more passion on Flink.

Look forward to your reply.

Alex.


Odd job failure

2018-04-27 Thread Elias Levy
We had a job on a Flink 1.4.2 cluster with three TMs experience an odd
failure the other day.  It seems that it started as some sort of network
event.

It began with the 3rd TM starting to warn every 30 seconds about socket
timeouts while sending metrics to DataDog.  This latest for the whole
outage.

Twelve minutes later, all TMs reported at nearly the same time that they
had marked the Kafka coordinator as deed ("Marking the coordinator XXX (id:
2147482640 rack: null) dead for group ZZZ").  The job terminated and the
system attempted to recover it.  Then things got into a weird state.

The following related for six or seven times for a period of about 40
minutes:

   1. TM attempts to restart the job, but only the first and second TMs
   show signs of doing so.
   2. The disk begins to fill up on TMs 1 and 2.
   3. TMs 1 & 2 both report java.lang.NoClassDefFoundError:
   org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on
   this list earlier this month.  It is unclear if the are benign.
   4. The job dies when the disks finally fills up on 1 and 2.


Looking at the backtrace logged when the disk fills up, I gather that Flink
is buffering data coming from Kafka into one of my operators as a result of
a barrier.  The job has a two input operator, with one input the primary
data, and a secondary input for control commands.  It would appear that for
whatever reason the barrier for the control stream is not making it to the
operator, thus leading to the buffering and full disks.  Maybe Flink
scheduled the operator source of the control stream on the 3rd TM which
seems like it was not scheduling tasks?

Finally the JM records that it 13 late messages for already expired
checkpoints (could they be from the 3rd TM?), the job is restored one more
time and it works.  All TMs report nearly at the same time that they can
now find the Kafka coordinator.


Seems like the 3rd TM has some connectivity issue, but then all TMs seems
to have a problem communicating with the Kafka coordinator at the same time
and recovered at the same time.  The TMs are hosted in AWS across AZs, so
all of them having connectivity issues at the same time is suspect.  The
Kafka node in question was up and other clients in our infrastructure seems
to be able to communicate with it without trouble.  Also, the Flink job
itself seemed to be talking to the Kafka cluster while restarting as it was
spilling data to disk coming from Kafka.  And the JM did not report any
reduction on available task slots, which would indicate connectivity issues
between the JM and the 3rd TM.  Yet, the logs in the 3rd TM do not show any
record of trying to restore the job during the intermediate attempts.

What do folks make of it?


And a question for Flink devs, is there some reason why Flink does not stop
spilling messages to disk when the disk is going to fill up?  Seems like
there should be a configurable limit to how much data can be spilled before
back-pressure is applied to slow down or stop the source.


Re: Class loading issues when using Remote Execution Environment

2018-04-27 Thread kedar mhaswade
Thanks again!

This is strange. With both Flink 1.3.3 and Flink 1.6.0-SNAPSHOT and
1) copying gradoop-demo-shaded.jar to /lib, and
2) using RemoteEnvironment with just jmHost and jmPort (no Jarfiles)

I get the same exception [1], caused by:
*Caused by: com.typesafe.config.ConfigException$Missing: No configuration
setting found for key 'akka.remote.log-received-messages'.*

This key is not documented anywhere, so I am confused. Also, copying with
above, also JM and TM are running, the Flink dashboard on
http://localhost:8081 is *unavailable*!

With Flink 1.3.3 and Flink 1.6.0-SNAPSHOT
1) NOT copying gradoop-shaded.jar in /lib, and
2) using RemoteEnvironment with jmHost, jmPort and jarFiles =
{}

I get the same exception, however the Flink dashboard on
http://localhost:8081 is *available*! This makes me believe that this is
somehow an insidious classloading issue :(.
I am really perplexed with this behavior. Let me stick to Flink 1.3.3
installation as you suggested for now.

If you have any other debugging tips, please let me know. But I am running
out of ideas to make it run with non-Local Environment.

Regards,
Kedar




[1] Gradoop shaded jar in /lib -- exception on the web-app:
org.apache.flink.client.program.ProgramInvocationException: Could not start
the ActorSystem needed to talk to the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:461)
at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:429)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211)
at
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188)
at
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926)
at
org.gradoop.demo.server.RequestHandler.getResponse(RequestHandler.java:447)
at
org.gradoop.demo.server.RequestHandler.createResponse(RequestHandler.java:430)
at
org.gradoop.demo.server.RequestHandler.executeCypher(RequestHandler.java:121)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60)
at
com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205)
at
com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75)
at
com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302)
at
com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at
com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108)
at
com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147)
at
com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84)
at
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542)
at
com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473)
at
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419)
at
com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409)
at
com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer._service(GrizzlyContainer.java:222)
at
com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer.service(GrizzlyContainer.java:192)
at
org.glassfish.grizzly.http.server.HttpHandler.doHandle(HttpHandler.java:164)
at
org.glassfish.grizzly.http.server.HttpHandlerChain.service(HttpHandlerChain.java:196)
at
org.glassfish.grizzly.http.server.HttpHandler.doHandle(HttpHandler.java:164)
at
org.glassfish.grizzly.http.server.HttpServerFilter.handleRead(HttpServerFilter.java:175)
at
org.glassfish.grizzly.filterchain.ExecutorResolver$9.execute(ExecutorResolver.java:119)
at
org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter(DefaultFilterChain.java:265)
at
org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart(DefaultFilterChain.java:200)
at
org.glassfish.grizzly.filterchain.DefaultFilterChain.execute(DefaultFilterChain.java:134)
at
org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:112)
at
org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:78)
at

Re: Setting the parallelism in a cluster of machines properly

2018-04-27 Thread m@xi
Hi Michael!

Seems that you were correct. It is weird that I could not set parallelism =
136.

I cannot configure the cluster properly so far. I do everything as it is
described here [1].
It seems that the JobManager is not reachable.

Best,
Max



[1] --
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RateLimit for Kinesis Producer

2018-04-27 Thread Tao Xia
Are you sure rate limit is coming from KinesisProducer?
If yes, Kinesis support 1000 record write per sec per shard. if you hit the
limit, just increase your shard.

On Fri, Apr 27, 2018 at 8:58 AM, Urs Schoenenberger <
urs.schoenenber...@tngtech.com> wrote:

> Hi all,
>
> we are struggling with RateLimitExceededExceptions with the Kinesis
> Producer. The Flink documentation claims that the Flink Producer
> overrides the RateLimit setting from Amazon's default of 150 to 100.
>
> I am wondering whether we'd need 100/($sink_parallelism) in order for it
> to work correctly. Since the shard partitioner works on a provided key,
> every parallel flink sink may use all the shards, right? And since the
> different parallel Flink sinks cannot coordinate this, every parallel
> sink will try to saturate every shard, thereby overestimating the
> capacity by $sink_parallelism.
>
> Does anyone else have experience with or knowledge about this?
>
> Best regards,
> Urs
>
> --
> Urs Schönenberger - urs.schoenenber...@tngtech.com
>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Re: Flink flatMap to pass a tuple and get multiple tuple

2018-04-27 Thread TechnoMage
Any itterable of Tuples will work for a for loop: List, Set, etc.

Michael

> On Apr 27, 2018, at 10:47 AM, Soheil Pourbafrani  
> wrote:
> 
> Thanks, what did you consider the return type of parse method? Arraylist of 
> tuples?
> 
> On Friday, April 27, 2018, TechnoMage  > wrote:
> > it would look more like:
> > for (Tuple2<> t2 : parse(t.f3) {
> > collector.collect(t2);
> > }
> > Michael
> >
> > On Apr 27, 2018, at 9:08 AM, Soheil Pourbafrani  > > wrote:
> > Hi, I want to use flatMap to pass to function namely 'parse' a tuple and it 
> > will return multiple tuple, that each should be a record in datastream 
> > object.
> > Something like this:
> >
> > DataStream> res = stream.flatMap(new 
> > FlatMapFunction, Tuple2>() {
> >
> > @Override
> > public void flatMap(Tuple3 t, 
> > Collector> collector) throws Exception {
> >
> > collector.collect(parse(t.f_3));
> > }
> > });
> >
> > that parse will return for example 6 tuples2 and I want them inserted into 
> > res datastream.
> >



RateLimit for Kinesis Producer

2018-04-27 Thread Urs Schoenenberger
Hi all,

we are struggling with RateLimitExceededExceptions with the Kinesis
Producer. The Flink documentation claims that the Flink Producer
overrides the RateLimit setting from Amazon's default of 150 to 100.

I am wondering whether we'd need 100/($sink_parallelism) in order for it
to work correctly. Since the shard partitioner works on a provided key,
every parallel flink sink may use all the shards, right? And since the
different parallel Flink sinks cannot coordinate this, every parallel
sink will try to saturate every shard, thereby overestimating the
capacity by $sink_parallelism.

Does anyone else have experience with or knowledge about this?

Best regards,
Urs

-- 
Urs Schönenberger - urs.schoenenber...@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Flink flatMap to pass a tuple and get multiple tuple

2018-04-27 Thread TechnoMage
it would look more like:

for (Tuple2<> t2 : parse(t.f3) {
collector.collect(t2);
}

Michael

> On Apr 27, 2018, at 9:08 AM, Soheil Pourbafrani  wrote:
> 
> Hi, I want to use flatMap to pass to function namely 'parse' a tuple and it 
> will return multiple tuple, that each should be a record in datastream object.
> 
> Something like this:
> 
> DataStream> res = stream.flatMap(new 
> FlatMapFunction, Tuple2>() {
> 
> @Override
> public void flatMap(Tuple3 t, 
> Collector> collector) throws Exception {
> 
> collector.collect(parse(t.f_3));
> }
> });
> 
> that parse will return for example 6 tuples2 and I want them inserted into 
> res datastream.



Flink flatMap to pass a tuple and get multiple tuple

2018-04-27 Thread Soheil Pourbafrani
Hi, I want to use flatMap to pass to function namely 'parse' a tuple and it
will return multiple tuple, that each should be a record in datastream
object.

Something like this:

DataStream> res = stream.flatMap(new
FlatMapFunction, Tuple2>() {

@Override
public void flatMap(Tuple3 t,
Collector> collector) throws Exception {

collector.collect(parse(t.f_3));
}
});


that parse will return for example 6 tuples2 and I want them inserted into
res datastream.


Re: Multiple Streams Connect Watermark

2018-04-27 Thread Chengzhi Zhao
Got it, Thanks a lot Fabian. Looking forward to seeing your book.

Best,
Chengzhi

On Thu, Apr 26, 2018 at 4:02 PM, Fabian Hueske  wrote:

> You can also merge all three types into an nary-Either type and union all
> three inputs together.
> However, Flink only supports a binary Either, so you'd have to implement a
> custom TypeInformation and TypeSerializer to make that work.
>
> Best, Fabian
>
> 2018-04-26 20:44 GMT+02:00 Chengzhi Zhao :
>
>> Thanks Fabian for the explanation.
>>
>> If I have data with different schemas, it seems the only option I have is
>> to use connect to perform joins (inner, outer), is there any operators that
>> can put more than two streams together (all different schema)?
>>
>> Best,
>> Chengzhi
>>
>> On Thu, Apr 26, 2018 at 6:05 AM, Fabian Hueske  wrote:
>>
>>> Hi Chengzhi,
>>>
>>> Functions in Flink are implemented in a way to preserve the timestamps
>>> of elements or assign timestamps which are aligned with the existing
>>> watermarks.
>>> For example, the result of a time window aggregation has the end
>>> timestamp of the window as a timestamp and records emitted by the onTimer()
>>> method have the timestamp of the timer as a record timestamp.
>>> So unless you fiddle with internal APIs to reset the record timestamps
>>> of elements, you don't need to worry about generating new watermarks.
>>>
>>> Best, Fabian
>>>
>>> 2018-04-25 20:20 GMT+02:00 Chengzhi Zhao :
>>>
 Hi, everyone,

 I am trying to do some join-like pipeline using flink connect operator
 and CoProcessFunction, I have use case that I need to connect 3+ streams.
 So I am having something like this:

 A
 ===> C
 B ==> E
   D

 So two streams A and B connect at first with 3 hours late on low
 watermark, after data has been emitted (the output C stream), a new stream
 D connect to C and emitted E as final output. I was wondering how the
 downstream watermark should be defined. Should I give C stream a new
 watermark for 3 hours delay again? or when I connect stream D, everything
 will be 6 hours late on low watermark.

 I am using BoundedOutOfOrdernessGenerator[1] with maxOutOfOrderness 3
 hours

 Thanks for your tips and help in advance.

 Best,
 Chengzhi

 [1]https://ci.apache.org/projects/flink/flink-docs-release-1
 .4/dev/event_timestamps_watermarks.html#with-periodic-watermarks

>>>
>>>
>>
>


Re: Regression: On Flink 1.5 CLI -m,--jobmanager option not working

2018-04-27 Thread Edward Alexander Rojas Clavijo
Thank you

2018-04-27 14:55 GMT+02:00 Chesnay Schepler :

> I've responded in the JIRA.
>
>
> On 27.04.2018 14:26, Edward Rojas wrote:
>
>> I'm preparing to migrate my environment from Flink 1.4 to 1.5 and I found
>> this issue.
>>
>> Every time I try to use the flink CLI with the -m option to specify the
>> jobmanager address, the CLI get stuck on "Waiting for response..." and  I
>> get the following error on the Jobmanager:
>>
>> WARN  akka.remote.transport.netty.NettyTransport-
>> Remote
>> connection to [/x.x.x.x:] failed with
>> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.f
>> rame.TooLongFrameException:
>> Adjusted frame length exceeds 10485760: 1195725860 - discarded
>>
>> I get the error even when I run it locally and try something like "flink
>> list -m localhost:6123". But "flink list" works as expected.
>>
>> I'm using the version from the "release-1.5" branch.
>>
>> I tested on the tag release 1.5.0-rc1 and it's working as expected.
>>
>> I did a /git bisect/ and it seems like the commit introducing the
>> regression
>> is  47909f4
>> > f94e9f6862a21b628817>
>>
>> I created a JIRA ticket for this:
>> https://issues.apache.org/jira/browse/FLINK-9255.
>>
>> Do you have any thoughts about it ?
>>
>> Regards,
>> Edward
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>>
>


-- 
*Edward Alexander Rojas Clavijo*



*Software EngineerHybrid CloudIBM France*


Re: Apache Flink Examples

2018-04-27 Thread Dhruv Kumar
Thanks. Tests and the example folder will help.

--
Dhruv Kumar
PhD Candidate
Department of Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me

> On Apr 27, 2018, at 06:47, Hung  wrote:
> 
> in my case I usually check the tests they write for each function I want to
> use. 
> 
> Take CountTrigger as an example, if I want to customize my own way of
> counting, I will have a look at 
> the test the write
> 
> https://github.com/apache/flink/blob/8dfb9d00653271ea4adbeb752da8f62d7647b6d8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java
> 
> Then I understand how this function is expected to work, and then I write my
> own test with my expected  result.
> 
> Test is the best documentation I would say. 
> 
> Also there is an example folder in github.
> https://github.com/apache/flink/tree/master/flink-examples
> 
> Best,
> 
> Sendoh
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Regression: On Flink 1.5 CLI -m,--jobmanager option not working

2018-04-27 Thread Chesnay Schepler

I've responded in the JIRA.

On 27.04.2018 14:26, Edward Rojas wrote:

I'm preparing to migrate my environment from Flink 1.4 to 1.5 and I found
this issue.

Every time I try to use the flink CLI with the -m option to specify the
jobmanager address, the CLI get stuck on "Waiting for response..." and  I
get the following error on the Jobmanager:

WARN  akka.remote.transport.netty.NettyTransport- Remote
connection to [/x.x.x.x:] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 1195725860 - discarded

I get the error even when I run it locally and try something like "flink
list -m localhost:6123". But "flink list" works as expected.

I'm using the version from the "release-1.5" branch.

I tested on the tag release 1.5.0-rc1 and it's working as expected.

I did a /git bisect/ and it seems like the commit introducing the regression
is  47909f4


I created a JIRA ticket for this:
https://issues.apache.org/jira/browse/FLINK-9255.

Do you have any thoughts about it ?

Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Regression: On Flink 1.5 CLI -m,--jobmanager option not working

2018-04-27 Thread Edward Rojas
I'm preparing to migrate my environment from Flink 1.4 to 1.5 and I found
this issue.

Every time I try to use the flink CLI with the -m option to specify the
jobmanager address, the CLI get stuck on "Waiting for response..." and  I
get the following error on the Jobmanager:

WARN  akka.remote.transport.netty.NettyTransport- Remote
connection to [/x.x.x.x:] failed with
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
Adjusted frame length exceeds 10485760: 1195725860 - discarded

I get the error even when I run it locally and try something like "flink
list -m localhost:6123". But "flink list" works as expected. 

I'm using the version from the "release-1.5" branch.

I tested on the tag release 1.5.0-rc1 and it's working as expected.

I did a /git bisect/ and it seems like the commit introducing the regression
is  47909f4

  

I created a JIRA ticket for this:
https://issues.apache.org/jira/browse/FLINK-9255.

Do you have any thoughts about it ?

Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Apache Flink Examples

2018-04-27 Thread Hung
in my case I usually check the tests they write for each function I want to
use. 

Take CountTrigger as an example, if I want to customize my own way of
counting, I will have a look at 
the test the write

https://github.com/apache/flink/blob/8dfb9d00653271ea4adbeb752da8f62d7647b6d8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java

Then I understand how this function is expected to work, and then I write my
own test with my expected  result.

Test is the best documentation I would say. 

Also there is an example folder in github.
https://github.com/apache/flink/tree/master/flink-examples

Best,

Sendoh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Application logs missing from jobmanager log

2018-04-27 Thread Juho Autio
Ah, found the place! In my case they seem to be going to
/home/hadoop/flink-1.5-SNAPSHOT/log/flink-hadoop-client-ip-10-0-10-29.log
(for example).

Any reason why these can't be shown in Flink UI, maybe in jobmanager log?

On Fri, Apr 27, 2018 at 12:13 PM, Juho Autio  wrote:

> The logs logged by my job jar before env.execute can't be found in
> jobmanager log. I can't find them anywhere else either.
>
> I can see all the usual logs by Flink components in the jobmanager log,
> though. And in taskmanager log I can see both Flink's internal & my
> application's logs from the execution time operations. But indeed missing
> the logs from before env.execute.
>
> I'm out of ideas how to debug this further. Any suggestions, or is this
> the known behaviour?
>
> I'm submitting my flink jobs in yarn cluster mode. Using Flink
> 1.5-SNAPSHOT.
>
>


Application logs missing from jobmanager log

2018-04-27 Thread Juho Autio
The logs logged by my job jar before env.execute can't be found in
jobmanager log. I can't find them anywhere else either.

I can see all the usual logs by Flink components in the jobmanager log,
though. And in taskmanager log I can see both Flink's internal & my
application's logs from the execution time operations. But indeed missing
the logs from before env.execute.

I'm out of ideas how to debug this further. Any suggestions, or is this the
known behaviour?

I'm submitting my flink jobs in yarn cluster mode. Using Flink 1.5-SNAPSHOT.


Re: Counter and guage metric are not populated consistently.

2018-04-27 Thread Chesnay Schepler
If a job finishes very quickly it commonly happens that metrics are not 
exposed; the WebUI periodically polls metrics, but the polling only 
works while the job is running. Reporters that expose metrics 
periodically show the same behavior.


On 27.04.2018 09:49, Ganesh Manal wrote:


Hi,

The custom counter/guage metric are generated only in special case.

Like if I execute wordcount example with hdfs file as input, I am able 
to see the counters.


But in case, wordcount is executed without hdfs file as input, i.e. 
using default Datasets, metric counters are not generated.


I am using flink 1.4.0.

Is there a specific configuration required for this ?

Thanks,

Ganesh Manal





Assign JIRA issue permission

2018-04-27 Thread Sampath Bhat
Hello

I would like to know the procedure for assigning the jira issue. How can I
assign it to myself?

Thanks


Counter and guage metric are not populated consistently.

2018-04-27 Thread Ganesh Manal
Hi,

The custom counter/guage metric are generated only in special case.

Like if I execute wordcount example with hdfs file as input, I am able to see 
the counters.
But in case, wordcount is executed without hdfs file as input, i.e. using 
default Datasets, metric counters are not generated.

I am using flink 1.4.0.
Is there a specific configuration required for this ?

Thanks,
Ganesh Manal




Re: Class loading issues when using Remote Execution Environment

2018-04-27 Thread Chesnay Schepler

First, a small correction for my previous mail:

I could reproduce your problems locally when submitting the fat-jar.
Turns out i never submitted the far-jar, as i didn't pass the jar file 
argument to RemoteEnvironment.


Now on to your questions:

/What version of Flink are you trying with?//
/I got it working /once /with 1.6-SNAPSHOT, but i would recommend 
sticking with 1.3.1 since that is the version gradoop depends on. (i 
haven't tried it with this version yet, but that's the next thing on my 
list)


/Are there other config changes (flink-conf.yaml) that you made in your 
cluster?

/It was the standard config.

/Is org.apache.flink.api.common.io.FileOutputFormat a good alternative 
to LocalCollectionOutputFormat?

/It can be used, but if the result is small you could also use accumulators.
/
//Do you think it is better to use jarFiles argument on 
createRemoteEnvironment?

/Yes, once we get it working this is the way to go.

On 26.04.2018 18:42, kedar mhaswade wrote:

Thanks Chesnay for your incredible help!

I will try out the suggestions again. A few questions:
- What version of Flink are you trying with? I have had issues when I 
placed the gradoop-demo-shaded.jarin the lib folder on Flink 
installation (1.4 even refused to start!).
- Are there other config changes (flink-conf.yaml) that you made in 
your cluster?
- Is org.apache.flink.api.common.io.FileOutputFormat a good 
alternative to LocalCollectionOutputFormat, or should I use 
HadoopOutputFormatCommonBase (I do want to run the cluster on YARN 
later; at the moment I am trying on a standalone cluster).
- Do you think it is better to use jarFiles argument on 
createRemoteEnvironment (which deploys the JAR only for this job and 
not mess with the entire Flink cluster) a better option than placing 
the JAR(s) in the lib folder?


Thanks again,
Regards,
Kedar


On Thu, Apr 26, 2018 at 3:14 AM, Chesnay Schepler > wrote:


Small update:

I could reproduce your problems locally when submitting the fat-jar.
I could get the job to run after placing the
gradoop-demo-shaded.jar into the lib folder.
I have not tried yet placing only the gradoop jars into lib (but
my guess is you missed a gradoop jar)

Note that the job fails to run since you use
"LocalCollectionOutputFormat" which can only be used for local
execution, i.e. when the job submission and execution happen in
the same JVM.


On 25.04.2018 14:23, kedar mhaswade wrote:

Thank you for your response!

I have not tried the flink run app.jar route because the way the
app is set up does not allow me to do it. Basically, the app is a
web application which serves the UI and also submits a Flink job
for running Cypher queries. It is a proof-of-concept app, but
IMO, a very useful one.

Here's how you can reproduce:
1) git clone g...@github.com:kedarmhaswade/gradoop_demo.git
 (this is
my fork of gradoop_demo)
2) cd gradoop_demo
3) git checkout dev => dev is the branch where my changes to make
gradoop work with remote environment go.
4) mvn clean package => should bring the gradoop JARs that this
app needs; these JARs should then be placed in /lib.
5) cp
~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar
/lib, cp
~/.m2/repository/org/gradoop/gradoop-flink/0.3.2/gradoop-flink-0.3.2.jar
/lib, cp target/gradoop-demo-0.2.0.jar
/lib.
6) start the local flink cluster (I have tried with latest
(built-from-source) 1.6-SNAPSHOT, or 1.4)
/bin/start-cluster.sh -- note the JM host and port
7) /start.sh --jmhost  --jmport 6123 (adjust
host and port per your cluster) => this is now configured to talk
to the RemoteEnvironment at given JM host and port.
8) open a browser at:
http://localhost:2342/gradoop/html/cypher.html

9) hit the query button => this would throw the exception
10) Ctrl C the process in 7 and just restart it as java -cp
target/classes:target/gradoop-demo-shaded.jar
org.gradoop.demo.server.Server => starts LocalEnvironment
11) do 9 again and see the results shown nicely in the browser.

Here is the relevant code:
1) Choosing between


a Remote or a Local Environment.

The instructions are correct to my knowledge. Thanks for your
willingness to try. I have tried everything I can. With different
Flink versions, I get different results (I have also tried on
1.6-SNAPSHOT with class loading config being parent-first, or
child-first).

Regards,
Kedar


On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler
> wrote:

I couldn't spot any error in 

Re: jobmanager rpc inside kubernetes

2018-04-27 Thread Sampath Bhat
It would be helpful if you provide the complete CLI logs. Because even I'm
using flink run command to submit jobs to flink jobmanager running on K8s
and its working fine. For remote execution using flink CLI you should
provide flink-conf.yaml file which contains job manager address, port and
SSL/HA related config if the same is enabled in your flink cluster to which
the job had to be submitted.

On Fri, Apr 27, 2018 at 7:24 AM, Chris Latko  wrote:

> when trying to submit a job to my k8s cluster using `flink run -m
> localhost:6123 app.jar` i get the following error in jobmanager log:
>
> -
> dropping message [class akka.actor.ActorSelectionMessage] for
> non-local recipient [Actor[akka.tcp://flink@localhost:6123/]] arriving
> at [akka.tcp://flink@localhost:6123] inbound addresses are
> [akka.tcp://flink@sis-flink-cluster-jobmanager.platform.
> svc.k8s.local:6123]
> -
>
> there are several workarounds but i'm absolutely not going to bundle
> my jar into a taskmanager pod just so i can deploy. i would rather
> have the jar on ceph and deploy from tm or jm:
> - kubectl exec -ti pod-replica-version bash
> - flink run blah blah blah...
>
> i could also run akka in unsafe mode, what are the majority of people
> doing regarding this issue?
>
> i have an up to date helm chart that is nearing production readiness
> that i'm willing to share if people are interested (and my company
> lets me).
>