Re: Need assistance : creating remote environment

2018-07-13 Thread Mohan mohan
Updated akka's 'reference.conf' file in my app shadow jar. Working fine now.

Thank you.

On Fri, Jul 13, 2018 at 10:46 PM Mohan mohan  wrote:

> Hi Chesnay,
>
> Now server and client/IDE both are using 1.4.0.
> When I submit job from IDE getting below exception,
>
> com.typesafe.config.ConfigException$Missing: No configuration setting
> found for key 'akka.remote.artery'
> I have used shadow jar. Thought problem is with merging jars. But I could
> not find above property 'akka.remote.artery' in any of akka jars.
> Please guide.
>
> On Fri, Jul 13, 2018 at 4:03 PM Chesnay Schepler 
> wrote:
>
>> FLIP-6  is/was an initiative to rework Flink distributed model.
>>
>> Contrary to my previous reply this wasn't introduced in 1.4, please
>> disregard explanation a).
>>
>> Thus, the only remaining explanation is that you're running 1.5 in the
>> client/IDE.
>> I heavily recommend to make sure that your client/IDE uses 1.4
>>
>> For completeness sake, to disable FLIP-6
>> * within the IDE, enable the "legacyCode" maven profile
>> * for the client, configure the mode
>> 
>> parameter.
>>
>> On 13.07.2018 12:20, Mohan mohan wrote:
>>
>> Hi,
>>  what is flip6? "Flink Improvement Proposal" ?
>> How to enable/disable flip mode in below cases,
>>   1. While starting cluster using ./start-cluster.sh
>>   2. While connecting using
>> ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "xyz.jar")
>>
>> Thank you.
>>
>> On Thu, Jul 12, 2018 at 9:32 PM Mohan mohan 
>> wrote:
>>
>>> Yep, that is due to version mismatch. Working fine now.
>>> Thank you Chesnay Schepler.
>>>
>>>
>>> On Wed, Jul 11, 2018 at 6:05 PM Chesnay Schepler 
>>> wrote:
>>>
 Based on the logs your client is using the RestClusterClient, which
 means that the client is either
 a) running 1.4 with the flip6 profile enabled
 b) running 1.5.

 Please ensure that both the flink versions match for client and server,
 and that both run/do not run in flip6 mode.

 On 11.07.2018 14:26, Mohan mohan wrote:

 Attached log file. (Log level : Trace)

 Is this the issue ? Trying with very minimal graph (execution plan is
 printed in log file)

 WARN  akka.remote.transport.netty.NettyTransport- 
 Remote connection to [/127.0.0.1:44322] failed with 
 org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
  Adjusted frame length exceeds 10485760: 1195725860 - discarded



 On Wed, Jul 11, 2018 at 5:44 PM Chesnay Schepler 
 wrote:

> Did/could you enable logging in the submitting code?
>
> On 11.07.2018 13:57, Mohan mohan wrote:
> > Hi,
> >
> > I have started flink in cluster mode.   ..flink1.4.2/bin/$
> > ./start-cluster.sh (no config changes ie., default settings)
> > And trying to connect to it,
> > ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
> > "xxx.jar");
> >
> > I am not seeing any response, did not find anything in jobmanager
> log.
> > Please guide how to trace the issue. Or Do we need any
> preconfiguration?
> > In local environment everything works fine.
> >
> > Using : flink-1.4.2-bin-scala_2.11.tgz
> >
> > Thanks in advance.
>
>
>

>>


flink 1.4.2 Ambari

2018-07-13 Thread antonio saldivar
Hello

I am trying to find the way to add Flink 1.4.2 service to ambari because is
not listed in the Stack. does anyone has the  steps to add this service
manually?

Thank you
Best regards


Re: Loading Rules from compacted Kafka Topic - open() vs Connected Streams

2018-07-13 Thread vijayakumar palaniappan
What i was trying to achieve from above was similar to GlobalKTable in
Kafka Streams.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams
Also current flink version i am using is 1.4

Are there any other suggestions/guidance to achieve GlobalKTable functionality
in flink

Thanks.

On Thu, Jul 12, 2018 at 1:00 PM vijayakumar palaniappan <
vijayakuma...@gmail.com> wrote:

> Hello All,
> I can think of two options of implementing below requirement and request
> some guidance on choosing the option with pros and cons.
>
> Requirements:
> - A in memory rules cache to be loaded from log compacted kafka topic.
> This cache has to be loaded prior to arrival of events.
> - Updates to the log compacted kafka topic has to be tracked to keep the
> in memory rule cache up to date
>
> Additional properties of data:
> - On Job start/restart, this rule cache is always loaded from earliest
> available offset in the log. - No kafka offset store and restore required.
> - No checkpointing needed for the rule cache, as it is loaded afresh in
> event of crash and restore
> - No eventTime semantics required as we always want the latest rules to be
> loaded to cache
>
> Implementation Options:
>
> 1. Using a KafkaConsumer in open() doing a initial load, and continuously
> fetching rule updates and keeping the in memory cache up to date. This
> option is not using a DataStream for rules as we don't use any goodies of
> stream like state,checkpoint, event time etc.
> 2. Connected Stream approach. Using a KafkaConsumer in open() doing a
> initial load. Have a FlinkKafkaSource Stream connected with events. In this
> case have to take care of out of order updates to caches, since the rules
> updates are from open() and Rule DataStream.
>
> --
> Thanks,
> -Vijay
>


-- 
Thanks,
-Vijay


Re: SinkFunction invoke method signature

2018-07-13 Thread Ashwin Sinha
+1

We encountered the exact same issue today.

On Fri, Jul 13, 2018 at 10:51 PM Philip Doctor 
wrote:

> Dear Flink Users,
> I noticed my sink's `invoke` method was deprecated, so I went to update
> it, I'm a little surprised by the new method signature, especially on
> Context (copy+pasted below for ease of discussion).  Shouldn't Context be
> Context not Context ? based on the docs?  I'm having a hard time
> understanding what's getting sent to me here in Context.  Anyone have any
> insights on why these might be different ?
>
>
> /** * Interface for implementing user defined sink functionality. * * @param 
>  Input type parameter. */@Publicpublic interface SinkFunction extends 
> Function, Serializable {
>
>
> /**
>
>  * Context that {@link SinkFunction SinkFunctions } can use for getting 
> additional data about * an input record. * * The context is only valid for 
> the duration of a * {@link SinkFunction#invoke(Object, Context)} call. Do not 
> store the context and use * afterwards! * * @param  The type of elements 
> accepted by the sink. */@Public // Interface might be extended in the future 
> with additional methods.interface Context {
>
>
>
> org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
>


-- 
*Ashwin Sinha *| Data Engineer
ashwin.si...@go-mmt.com  | 9452075361
 


-- 


::DISCLAIMER::








This message is intended only for the use of the addressee and may 
contain information that is privileged, confidential and exempt from 
disclosure under applicable law. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this e-mail in error, please notify us 
immediately by return e-mail and delete this e-mail and all attachments 
from your system.


SinkFunction invoke method signature

2018-07-13 Thread Philip Doctor
Dear Flink Users,
I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm 
a little surprised by the new method signature, especially on Context 
(copy+pasted below for ease of discussion).  Shouldn't Context be Context 
not Context ? based on the docs?  I'm having a hard time understanding 
what's getting sent to me here in Context.  Anyone have any insights on why 
these might be different ?


/**
 * Interface for implementing user defined sink functionality.
 *
 * @param  Input type parameter.
 */
@Public
public interface SinkFunction extends Function, Serializable {


/**

 * Context that {@link SinkFunction SinkFunctions } can use for getting 
additional data about
 * an input record.
 *
 * The context is only valid for the duration of a
 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context 
and use
 * afterwards!
 *
 * @param  The type of elements accepted by the sink.
 */
@Public // Interface might be extended in the future with additional methods.
interface Context {


org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java


Re: Need assistance : creating remote environment

2018-07-13 Thread Mohan mohan
Hi Chesnay,

Now server and client/IDE both are using 1.4.0.
When I submit job from IDE getting below exception,

com.typesafe.config.ConfigException$Missing: No configuration setting found
for key 'akka.remote.artery'
I have used shadow jar. Thought problem is with merging jars. But I could
not find above property 'akka.remote.artery' in any of akka jars.
Please guide.

On Fri, Jul 13, 2018 at 4:03 PM Chesnay Schepler  wrote:

> FLIP-6  is/was an initiative to rework Flink distributed model.
>
> Contrary to my previous reply this wasn't introduced in 1.4, please
> disregard explanation a).
>
> Thus, the only remaining explanation is that you're running 1.5 in the
> client/IDE.
> I heavily recommend to make sure that your client/IDE uses 1.4
>
> For completeness sake, to disable FLIP-6
> * within the IDE, enable the "legacyCode" maven profile
> * for the client, configure the mode
> 
> parameter.
>
> On 13.07.2018 12:20, Mohan mohan wrote:
>
> Hi,
>  what is flip6? "Flink Improvement Proposal" ?
> How to enable/disable flip mode in below cases,
>   1. While starting cluster using ./start-cluster.sh
>   2. While connecting using
> ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "xyz.jar")
>
> Thank you.
>
> On Thu, Jul 12, 2018 at 9:32 PM Mohan mohan 
> wrote:
>
>> Yep, that is due to version mismatch. Working fine now.
>> Thank you Chesnay Schepler.
>>
>>
>> On Wed, Jul 11, 2018 at 6:05 PM Chesnay Schepler 
>> wrote:
>>
>>> Based on the logs your client is using the RestClusterClient, which
>>> means that the client is either
>>> a) running 1.4 with the flip6 profile enabled
>>> b) running 1.5.
>>>
>>> Please ensure that both the flink versions match for client and server,
>>> and that both run/do not run in flip6 mode.
>>>
>>> On 11.07.2018 14:26, Mohan mohan wrote:
>>>
>>> Attached log file. (Log level : Trace)
>>>
>>> Is this the issue ? Trying with very minimal graph (execution plan is
>>> printed in log file)
>>>
>>> WARN  akka.remote.transport.netty.NettyTransport- 
>>> Remote connection to [/127.0.0.1:44322] failed with 
>>> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>>>  Adjusted frame length exceeds 10485760: 1195725860 - discarded
>>>
>>>
>>>
>>> On Wed, Jul 11, 2018 at 5:44 PM Chesnay Schepler 
>>> wrote:
>>>
 Did/could you enable logging in the submitting code?

 On 11.07.2018 13:57, Mohan mohan wrote:
 > Hi,
 >
 > I have started flink in cluster mode.   ..flink1.4.2/bin/$
 > ./start-cluster.sh (no config changes ie., default settings)
 > And trying to connect to it,
 > ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
 > "xxx.jar");
 >
 > I am not seeing any response, did not find anything in jobmanager
 log.
 > Please guide how to trace the issue. Or Do we need any
 preconfiguration?
 > In local environment everything works fine.
 >
 > Using : flink-1.4.2-bin-scala_2.11.tgz
 >
 > Thanks in advance.



>>>
>


akka.stream.materializer exception

2018-07-13 Thread Rad Rad
Hi, 

I have a jar file that subscribes streaming data from Kafka and then
executes some Flink queries.   When the program runs locally, it works fine.
But, when I run the jar file, I got these exceptions which sound that
akka.stream.materializer causes the problem, I added akka.stream
dependencies but it stills the same problem. Can anyone help me to fix this
problem?

org.apache.flink.streaming.api.datastream.DataStreamSink@35d019a3de
Exception in thread "main"
com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @
jar:file:/home/radsah/Desktop/pipline.jar!/reference.conf: 804: Could not
resolve substitution to a value: ${*akka.stream.materializer*}
at
com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:231)
at 
com.typesafe.config.impl.SimpleConfig.resolveWith(SimpleConfig.java:74)
at com.typesafe.config.impl.SimpleConfig.resolve(SimpleConfig.java:64)
at com.typesafe.config.impl.SimpleConfig.resolve(SimpleConfig.java:59)
at com.typesafe.config.impl.SimpleConfig.resolve(SimpleConfig.java:37)
at com.typesafe.config.impl.ConfigImpl$1.call(ConfigImpl.java:374)
at com.typesafe.config.impl.ConfigImpl$1.call(ConfigImpl.java:367)
at
com.typesafe.config.impl.ConfigImpl$LoaderCache.getOrElseUpdate(ConfigImpl.java:65)
at
com.typesafe.config.impl.ConfigImpl.computeCachedConfig(ConfigImpl.java:92)
at
com.typesafe.config.impl.ConfigImpl.defaultReference(ConfigImpl.java:367)
at
com.typesafe.config.ConfigFactory.defaultReference(ConfigFactory.java:413)
at akka.actor.ActorSystem$Settings.(ActorSystem.scala:307)
at akka.actor.ActorSystemImpl.(ActorSystem.scala:683)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:245)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:288)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:263)
at akka.actor.ActorSystem$.create(ActorSystem.scala:191)
at
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:106)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:300)
at

Using an Akka Actor System with Flink

2018-07-13 Thread Bhashit Parikh
I have a few libraries for doing IO, with large amounts of data, that use
akka-http. Now. I have a Flink application, where I need to use those
libraries.

How do I use a common actor-system across my flink app? I mean, AFAIU,
flink apps will be distributed and the same pipeline could end up creating
multiple actor systems on multiple JVMs. Also, the same JVM could be shared
between multiple pipelines too, and in that case, having multiple actor
systems would mean having a lot of threads.

Am I right in assuming that this is a problem? If so, how can I avoid that
problem? More specifically, is there a way to share an actor system between
the apps on the same JVM? I know Flink has a way to get the same execution
context using the Async IO. (
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/asyncio.html#async-io-api
).

Thanks,
Bhashit


Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window

2018-07-13 Thread Hequn Cheng
Hi Youjun,

The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the
rowtime value of window. Sql will be parsed and translated into some nodes,
Source -> Calc -> Window -> Sink. The Calc is the input node of Window and
the udf is part of Calc instead of Window. So the max_ts and min_ts is
actually the time before entering the window, i.e, not the time in window.

However, I still can't find anything valuable to solve the problem. It
seems the window has been triggered many times for the same key. I will
think more about it.

Best, Hequn.

On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun  wrote:

> Hi Hequn,
>
>
>
> I am using Flink 1.4. The job was running with  1 parallelism.
>
>
>
> I don’t think the extra records are caused by different keys, because:
>
>1. I ran 2 jobs consuming the same source, jobA with 2-minute window,
>and job with 4-minute window. If there are wired keys, then jobA will get
>no more records than jobB, for the same period. But that not true, *jobA
>got 17* records while *jobB got 11*. Relevant results could be found
>below.
>2. For each window, I output the *min and max timestamp*, and found
>that those extra records always start at the last few milliseconds of the 2
>or 4-minte windows, just before window got closed. I also noticed the
>windows did not have a clear cut between minutes, as we can see in jobA’s
>output, ts *1531448399978* appears in 18 result records, either as
>start, or end, or both.
>
>
>
> jobA(2-minute window) output
>
> {"timestamp":153144804,"cnt":1668052,"userId":"user01"
> ,"min_ts":1531448040003,"max_ts":1531448159985}
>
> {"timestamp":153144816,"cnt":1613188,"userId":"user01"
> ,"min_ts":1531448159985,"max_ts":1531448279979}
>
> {"timestamp":153144828,"cnt":1664652,"userId":"user01"
> ,"min_ts":1531448280004,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":4,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":153144840,"cnt":1593435,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":1531448519978}
>
>
>
> jobB(4-minute window) output
>
> {"timestamp":153144792,"cnt":3306838,"userId":"user01"
> ,"min_ts":1531447919981,"max_ts":1531448159975}
>
> {"timestamp":153144816,"cnt":3278178,"userId":"user01"
> ,"min_ts":1531448159098,"max_ts":1531448399977}
>
> {"timestamp":153144816,"cnt":4,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399977}
>
> {"timestamp":153144816,"cnt":5,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399977}
>
> {"timestamp":153144816,"cnt":8,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399978}
>
> {"timestamp":153144816,"cnt":7,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":153144816,"cnt":2,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":153144816,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":153144816,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":153144816,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":153144816,"cnt":3,"userId":"user01","min_
> 

Flink on Mesos: containers question

2018-07-13 Thread NEKRASSOV, ALEXEI
Can someone please clarify how Flink on Mesos in containerized?

On 5-node Mesos cluster I started Flink (1.4.2) with two Task Managers. Mesos 
shows "flink" task and two "taskmanager" tasks, all on the same VM.
On that VM I see one Docker container running a process that seems to be Mesos 
App Master:

$ docker ps -a
CONTAINER IDIMAGE COMMAND  
CREATED STATUS  PORTS   NAMES
97b6840466c0mesosphere/dcos-flink:1.4.2-1.0   "/bin/sh -c /sbin/..."   
41 hours agoUp 41 hours 
mesos-a0079d85-9ccb-4c43-8d31-e6b1ad750197
$ docker exec 97b6840466c0 /bin/ps -efww
UIDPID  PPID  C STIME TTY  TIME CMD
root 1 0  0 Jul11 ?00:00:00 /bin/sh -c /sbin/init.sh
root 7 1  0 Jul11 ?00:00:02 runsvdir -P /etc/service
root 8 7  0 Jul11 ?00:00:00 runsv flink
root   629 0  0 Jul12 pts/000:00:00 /bin/bash
root   789 8  1 Jul12 ?00:09:16 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true
root  1027 0  0 12:54 ?00:00:00 /bin/ps -efww

Then on the VM itself I see another process with the same command line as the 
one in the container:

root 13276  9689  1 Jul12 ?00:09:18 
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -classpath 
/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar:/flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/flink-1.4.2/lib/log4j-1.2.17.jar:/flink-1.4.2/lib/slf4j-log4j12-1.7.7.jar:/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar::/etc/hadoop/conf/:
 
-Dlog.file=/mnt/mesos/sandbox/flink--mesos-appmaster-alex-tfc87d-private-agents-3.novalocal.log
 -Dlog4j.configuration=file:/flink-1.4.2/conf/log4j.properties 
-Dlogback.configurationFile=file:/flink-1.4.2/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner 
-Dblob.server.port=23170 -Djobmanager.heap.mb=256 -Djobmanager.rpc.port=23169 
-Djobmanager.web.port=23168 -Dmesos.artifact-server.port=23171 
-Dmesos.initial-tasks=2 -Dmesos.resourcemanager.tasks.cpus=2 
-Dmesos.resourcemanager.tasks.mem=2048 -Dtaskmanager.heap.mb=512 
-Dtaskmanager.memory.preallocate=true -Dtaskmanager.numberOfTaskSlots=1 
-Dparallelism.default=1 -Djobmanager.rpc.address=localhost 
-Dmesos.resourcemanager.framework.role=* 
-Dsecurity.kerberos.login.use-ticket-cache=true

And I see two processes on the VM that seem to be related to Task Managers:

root 13688 13687  0 Jul12 ?00:04:25 /docker-java-home/jre/bin/java 
-Xms1448m -Xmx1448m -classpath 
/mnt/mesos/sandbox/flink/lib/flink-python_2.11-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/flink-shaded-hadoop2-uber-1.4.2.jar:/mnt/mesos/sandbox/flink/lib/log4j-1.2.17.jar:/mnt/mesos/sandbox/flink/lib/slf4j-log4j12-1.7.7.jar:/mnt/mesos/sandbox/flink/lib/flink-dist_2.11-1.4.2.jar:::
 -Dlog.file=flink-taskmanager.log 
-Dlog4j.configuration=file:/mnt/mesos/sandbox/flink/conf/log4j.properties 
-Dlogback.configurationFile=file:/mnt/mesos/sandbox/flink/conf/logback.xml 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager 
-Dblob.server.port=23170 -Dmesos.artifact-server.port=23171 
-Djobmanager.heap.mb=256 -Djobmanager.rpc.address=localhost 
-Djobmanager.web.port=23168 -Dsecurity.kerberos.login.use-ticket-cache=true 
-Djobmanager.rpc.port=23169 -Dtaskmanager.memory.preallocate=true 
-Dtaskmanager.rpc.port=1027 -Dmesos.initial-tasks=2 
-Dmesos.resourcemanager.tasks.cpus=2 -Dtaskmanager.maxRegistrationDuration=5 
minutes -Dtaskmanager.data.port=1028 -Dparallelism.default=1 
-Dtaskmanager.numberOfTaskSlots=1 -Dmesos.resourcemanager.tasks.mem=2048 
-Dtaskmanager.heap.mb=512 -Dmesos.resourcemanager.framework.role=*
root 13892 13891  0 Jul12 ?00:04:15 /docker-java-home/jre/bin/java 
-Xms1448m -Xmx1448m -classpath 

Re: [ANNOUNCE] Apache Flink 1.5.1 released

2018-07-13 Thread Dawid Wysakowicz
Good job everyone and Chesnay for being the release manager!


On 13/07/18 14:34, Hequn Cheng wrote:
> Cool, thanks to Chesnay!
>
> Best, Hequn
>
> On Fri, Jul 13, 2018 at 8:25 PM, vino yang  > wrote:
>
> Thanks Chesnay, great job!
>
> Thanks,
> Vino
>
> 2018-07-13 20:20 GMT+08:00 Till Rohrmann  >:
>
> Great to hear. Big thank you to the community for the hard
> work and to Chesnay for being our release manager.
>
> Cheers,
> Till
>
> On Fri, Jul 13, 2018 at 12:05 PM Chesnay Schepler
> mailto:ches...@apache.org>> wrote:
>
> The Apache Flink community is very happy to announce the
> release of Apache Flink 1.5.1, which is the first bugfix
> release for the Apache Flink 1.5 series.
>
> Apache Flink® is an open-source stream processing
> framework for distributed, high-performing,
> always-available, and accurate data streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>  
>
> Please check out the release blog post for an overview of
> the improvements for this bugfix release:
> http://flink.apache.org/news/2018/07/12/release-1.5.1.html
> 
>
> The full release notes are available in Jira:
> 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343053
> 
> 
>
> We would like to thank all contributors of the Apache
> Flink community who made this release possible!
>
> PS: A regression was identified where the CLI job
> submission fails when SSL is enabled. (FLINK-9842)
>
> Regards,
> Chesnay
>
>
>




Re: [ANNOUNCE] Apache Flink 1.5.1 released

2018-07-13 Thread Hequn Cheng
Cool, thanks to Chesnay!

Best, Hequn

On Fri, Jul 13, 2018 at 8:25 PM, vino yang  wrote:

> Thanks Chesnay, great job!
>
> Thanks,
> Vino
>
> 2018-07-13 20:20 GMT+08:00 Till Rohrmann :
>
>> Great to hear. Big thank you to the community for the hard work and to
>> Chesnay for being our release manager.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jul 13, 2018 at 12:05 PM Chesnay Schepler 
>> wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.5.1, which is the first bugfix release for the Apache Flink
>>> 1.5 series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> http://flink.apache.org/news/2018/07/12/release-1.5.1.html
>>>
>>> The full release notes are available in Jira:
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?proje
>>> ctId=12315522=12343053
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> PS: A regression was identified where the CLI job submission fails when
>>> SSL is enabled. (FLINK-9842)
>>>
>>> Regards,
>>> Chesnay
>>>
>>>
>


Re: TimeWindow doesn't trigger reduce function

2018-07-13 Thread Hequn Cheng
Hi Soheil,

It seems you job stops within 1 second?
The processing time window doesn't output data if time hasn't reach the
window end. While event time window will output a final watermark during
close() to avoid this problem.
You can try to increase the running time of your job to get the output data.

Best, Hequn

On Fri, Jul 13, 2018 at 6:37 PM, Soheil Pourbafrani 
wrote:

> Hi, My stream data is in a type of Tuple2 that contains the
> timestamp (in second) and data, respectively. The source will generate 120
> sample every second. Using the following code I want to get data in every
> second and then apply the reduce function on them.
>
> temp.keyBy( 0).timeWindow(Time.seconds(1))
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 longStringTuple2, 
> Tuple2 t1) throws Exception {
> System.out.println("reduced");
> return new Tuple2<>(longStringTuple2.f0, longStringTuple2.f1 + "," + 
> t1.f1) ;
> }
> }).print() ;
>
> I expected it print reduced data for every second, according to the reduce
> function, but it just print the test line
>
> System.out.println("reduced");
>
> that I put in reduce function to see if it enter the reduce function or
> not. I can confirm the data are entering in temp variable.
>
> What is the problem ? Should I implement a trigger function
>


Re: [ANNOUNCE] Apache Flink 1.5.1 released

2018-07-13 Thread vino yang
Thanks Chesnay, great job!

Thanks,
Vino

2018-07-13 20:20 GMT+08:00 Till Rohrmann :

> Great to hear. Big thank you to the community for the hard work and to
> Chesnay for being our release manager.
>
> Cheers,
> Till
>
> On Fri, Jul 13, 2018 at 12:05 PM Chesnay Schepler 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.5.1, which is the first bugfix release for the Apache Flink
>> 1.5 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> http://flink.apache.org/news/2018/07/12/release-1.5.1.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?
>> projectId=12315522=12343053
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> PS: A regression was identified where the CLI job submission fails when
>> SSL is enabled. (FLINK-9842)
>>
>> Regards,
>> Chesnay
>>
>>


回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-13 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

I thought the failed task triggers cancel process before, now I am clear that 
you cancel the task when it stops processing data.
I think you can jstack the process to find where task thread is blocked instead 
of canceling it, then we may find some hints.

In addition, the following stack "DataOutputSerializer.resize" indicates the 
task is serializing the record and there will be overhead byte buffers in the 
serializer for copying data temporarily. And if your record is too large, it 
may cause OOM in this process and this overhead memory is not managed by flink 
framework. Also you can monitor the gc status to check the full gc delay.

Best,
Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月13日(星期五) 16:22
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Hi Zhijiang,

The problem is that no other task failed first. We have a task that sometimes 
just stops processing data, and when we cancel it, we see the logs messages  
saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is stuck in 
method: 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
 org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why it 
stops processing data. I don;t see any increase in memory use in the heap (I 
guess because these buffers are managed by Flink) so I'm not sure if that is 
really the problem.

Gerard
On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) 
 wrote:
Hi Gerard,

I think you can check the job manager log to find which task failed at first, 
and then trace the task manager log containing the failed task to find the 
initial reason.
The failed task will trigger canceling all the other tasks, and during 
canceling process, the blocked task that is waiting for output buffer can not 
be interrupted by the
canceler thread which is shown in your description. So I think the cancel 
process is not the key point and is in expectation. Maybe it did not cause OOM 
at all. 
If the taskduring canceling, the task manager process will be exited finally to 
trigger restarting the job.

Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong 
to the unresponsive task, that is why we suspect that at some point it did not 
have enough memory to serialize the message and it blocked. I've also found 
that when it hanged several output buffers were full (see attached image 
buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the 
output buffers previous to the blocked operator. I'll have to continue 
investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
 wrote:
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang

--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 

TimeWindow doesn't trigger reduce function

2018-07-13 Thread Soheil Pourbafrani
Hi, My stream data is in a type of Tuple2 that contains the
timestamp (in second) and data, respectively. The source will generate 120
sample every second. Using the following code I want to get data in every
second and then apply the reduce function on them.

temp.keyBy( 0).timeWindow(Time.seconds(1))
.reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2
longStringTuple2, Tuple2 t1) throws Exception {
System.out.println("reduced");
return new Tuple2<>(longStringTuple2.f0, longStringTuple2.f1 +
"," + t1.f1) ;
}
}).print() ;

I expected it print reduced data for every second, according to the reduce
function, but it just print the test line

System.out.println("reduced");

that I put in reduce function to see if it enter the reduce function or
not. I can confirm the data are entering in temp variable.

What is the problem ? Should I implement a trigger function


Re: Need assistance : creating remote environment

2018-07-13 Thread Chesnay Schepler

FLIP-6  is/was an initiative to rework Flink distributed model.

Contrary to my previous reply this wasn't introduced in 1.4, please 
disregard explanation a).


Thus, the only remaining explanation is that you're running 1.5 in the 
client/IDE.

I heavily recommend to make sure that your client/IDE uses 1.4

For completeness sake, to disable FLIP-6
* within the IDE, enable the "legacyCode" maven profile
* for the client, configure the mode 
 
parameter.


On 13.07.2018 12:20, Mohan mohan wrote:

Hi,
 what is flip6? "Flink Improvement Proposal" ?
How to enable/disable flip mode in below cases,
  1. While starting cluster using ./start-cluster.sh
  2. While connecting using 
ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, 
"xyz.jar")


Thank you.

On Thu, Jul 12, 2018 at 9:32 PM Mohan mohan > wrote:


Yep, that is due to version mismatch. Working fine now.
Thank you Chesnay Schepler.


On Wed, Jul 11, 2018 at 6:05 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Based on the logs your client is using the RestClusterClient,
which means that the client is either
a) running 1.4 with the flip6 profile enabled
b) running 1.5.

Please ensure that both the flink versions match for client
and server, and that both run/do not run in flip6 mode.

On 11.07.2018 14:26, Mohan mohan wrote:

Attached log file. (Log level : Trace)

Is this the issue ? Trying with very minimal graph (execution
plan is printed in log file)

WARN  akka.remote.transport.netty.NettyTransport- Remote 
connection to [/127.0.0.1:44322 ] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
 Adjusted frame length exceeds 10485760: 1195725860 - discarded


On Wed, Jul 11, 2018 at 5:44 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Did/could you enable logging in the submitting code?

On 11.07.2018 13:57, Mohan mohan wrote:
> Hi,
>
> I have started flink in cluster mode.  ..flink1.4.2/bin/$
> ./start-cluster.sh (no config changes ie., default
settings)
> And trying to connect to it,
>
ExecutionEnvironment.createRemoteEnvironment("localhost",
6123,
> "xxx.jar");
>
> I am not seeing any response, did not find anything in
jobmanager log.
> Please guide how to trace the issue. Or Do we need any
preconfiguration?
> In local environment everything works fine.
>
> Using : flink-1.4.2-bin-scala_2.11.tgz
>
> Thanks in advance.








Cannot configure akka.ask.timeout

2018-07-13 Thread Lukas Kircher
Hello,

I have problems setting configuration parameters for Akka in Flink 1.5.0. When 
I run a job I get the exception listed below which states that Akka timed out 
after 1ms. I tried to increase the timeout by following the Flink 
configuration documentation. Specifically I did the following:

1) Passed a configuration to the Flink execution environment with 
`akka.ask.timeout` set to a higher value. I started this in Intellij.
2) Passed program arguments via the run configuration in Intellij, e.g. 
`-Dakka.ask.timeout:100s`
3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local 
standalone cluster via start-cluster.sh. The setting is reflected in Flink's 
web interface.

However - despite explicit configuration the default setting seems to be used. 
The exception below states in each case that akka ask timed out after 1ms.

As my problem seems very basic I do not include an SSCCE for now but I can try 
to build one if this helps figuring out the issue.

--
[...]
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Could not retrieve 
JobResult.
[...]
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)
at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)
at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
[...]
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
 after [1 ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
[...]
--


Best regards and thanks for your help,
Lukas





Re: Need assistance : creating remote environment

2018-07-13 Thread Mohan mohan
Hi,
 what is flip6? "Flink Improvement Proposal" ?
How to enable/disable flip mode in below cases,
  1. While starting cluster using ./start-cluster.sh
  2. While connecting using
ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "xyz.jar")

Thank you.

On Thu, Jul 12, 2018 at 9:32 PM Mohan mohan  wrote:

> Yep, that is due to version mismatch. Working fine now.
> Thank you Chesnay Schepler.
>
>
> On Wed, Jul 11, 2018 at 6:05 PM Chesnay Schepler 
> wrote:
>
>> Based on the logs your client is using the RestClusterClient, which means
>> that the client is either
>> a) running 1.4 with the flip6 profile enabled
>> b) running 1.5.
>>
>> Please ensure that both the flink versions match for client and server,
>> and that both run/do not run in flip6 mode.
>>
>> On 11.07.2018 14:26, Mohan mohan wrote:
>>
>> Attached log file. (Log level : Trace)
>>
>> Is this the issue ? Trying with very minimal graph (execution plan is
>> printed in log file)
>>
>> WARN  akka.remote.transport.netty.NettyTransport- Remote 
>> connection to [/127.0.0.1:44322] failed with 
>> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>>  Adjusted frame length exceeds 10485760: 1195725860 - discarded
>>
>>
>>
>> On Wed, Jul 11, 2018 at 5:44 PM Chesnay Schepler 
>> wrote:
>>
>>> Did/could you enable logging in the submitting code?
>>>
>>> On 11.07.2018 13:57, Mohan mohan wrote:
>>> > Hi,
>>> >
>>> > I have started flink in cluster mode.   ..flink1.4.2/bin/$
>>> > ./start-cluster.sh (no config changes ie., default settings)
>>> > And trying to connect to it,
>>> > ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
>>> > "xxx.jar");
>>> >
>>> > I am not seeing any response, did not find anything in jobmanager log.
>>> > Please guide how to trace the issue. Or Do we need any
>>> preconfiguration?
>>> > In local environment everything works fine.
>>> >
>>> > Using : flink-1.4.2-bin-scala_2.11.tgz
>>> >
>>> > Thanks in advance.
>>>
>>>
>>>
>>


[ANNOUNCE] Apache Flink 1.5.1 released

2018-07-13 Thread Chesnay Schepler

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.5.1, which is the first bugfix release for the Apache Flink 1.5 series.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html  


Please check out the release blog post for an overview of the improvements for 
this bugfix release:
http://flink.apache.org/news/2018/07/12/release-1.5.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343053

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

PS: A regression was identified where the CLI job submission fails when SSL is 
enabled. (FLINK-9842)

Regards,
Chesnay



Flink CLI properties with HA

2018-07-13 Thread Sampath Bhat
Hello

When HA is enabled in the flink cluster and if I've to submit job via flink
CLI then in the flink-conf.yaml of flink CLI should contain this properties
-
high-availability: zookeeper
high-availability.cluster-id: flink
high-availability.zookeeper.path.root: flink
high-availability.storageDir: 
high-availability.zookeeper.quorum: 

What is the need of high-availability.storageDir for flink CLI. Does this
mean that even flink client should be able to access the mentioned path or
is it some check being done on the property name?

Without these properties flink cli will not be able to submit job to flink
cluster when HA is enabled.


Re: Taskmanager SSL fails looking for Subject Alternative IP Address

2018-07-13 Thread Stephan Ewen
Thanks for reporting this.

Given that hostname verification seems to be the issue, I would assume that
the TaskManager somehow advertises a hostname in a form that is incompatile
with the verification in some setups.

While it would be interesting to dig deeper into why this happens, I think
we need to move away from hostname verification for internal communication
(rpc, TaskManager Netty, blob server) anyways for the following reasons:

  - Hostname verification is hard (or pretty much incompatible) between
containers in many container environments
  - The verification is mainly useful if you use a certificate in a
certification chain with some other trusted root certificates
  - For internal SSL between JM/TM and TM/TM, the recommended method is to
generate a single purpose certificate (may be self signed) and add a key
store and trust store with only that certificate. Given such a "single
certificate truststore", hostname verification does not add any additional
security (to my understanding).

For Flink 1.6, we are also adding transparent mutual authentication for
internal communication (RPC; blob server, netty data plane), which should
be an additional level of security. If this is uses with dedicated (self
signed) certificates, it should be very secure and not rely on hostname
verification.

That said, for external communication (REST calls against
JM/Dispatcher/...) clients should use hostname verification, because many
users use certificates in a certificate chain for these external endpoints.

Best,
Stephan



On Thu, Jul 12, 2018 at 11:02 PM, PACE, JAMES  wrote:

> I have the following SSL configuration for a 3 node HA flink cluster:
>
>
>
> #taskmanager.data.ssl.enabled: false
>
> security.ssl.enabled: true
>
> security.ssl.keystore: /opt/app/certificates/server-keystore.jks
>
> security.ssl.keystore-password: 
>
> security.ssl.key-password: 
>
> security.ssl.truststore: /opt/app/certificates/cacerts
>
> security.ssl.truststore-password: 
>
> security.ssl.verify-hostname: true
>
>
>
> The job we’re running is the sample WordCount.jar.  The running version of
> flink is 1.4.0.  It’s not the latest, but I didn’t see anything that looked
> like updating would solve this issue.
>
>
>
> If either security.ssl.verify-hostname is set to false or
> taskmanager.data.ssl.enabled is set to false, everything works fine.
>
>
>
> When flink is run in the above configuration above, with ssl fully enabled
> and security.ssl.verify-hostname: true, the flink job fails.  However, when
> going through the logs, SSL appears fine for akka, blob service, and
> jobmanager.
>
>
>
> The root cause looks to be Caused by: java.security.cert.CertificateException:
> No subject alternative names matching IP address xxx.xxx.xxx.xxx found.
>
> I have tried setting taskmanager.hostname to the FQDN of the host, but
> that did not change anything.
>
> We don’t generate certificates with SAN fields.
>
>
>
> Any thoughts would be appreciated.
>
>
>
> This is the full stack trace
>
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Sending the partition request failed.
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> Sending the partition request failed.
>
> at org.apache.flink.runtime.io.network.netty.
> PartitionRequestClient$1.operationComplete(PartitionRequestClient.java:
> 119)
>
> at org.apache.flink.runtime.io.network.netty.
> PartitionRequestClient$1.operationComplete(PartitionRequestClient.java:
> 111)
>
> at org.apache.flink.shaded.netty4.io.netty.util.
> concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>
> at org.apache.flink.shaded.netty4.io.netty.util.
> concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
>
> at org.apache.flink.shaded.netty4.io.netty.util.
> concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>
> at org.apache.flink.shaded.netty4.io.netty.channel.
> PendingWriteQueue.safeFail(PendingWriteQueue.java:252)
>
> at org.apache.flink.shaded.netty4.io.netty.channel.
> PendingWriteQueue.removeAndFailAll(PendingWriteQueue.java:112)
>
> at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.
> setHandshakeFailure(SslHandler.java:1256)
>
> at org.apache.flink.shaded.netty4.io.netty.handler.ssl.
> SslHandler.unwrap(SslHandler.java:1040)
>
> at org.apache.flink.shaded.netty4.io.netty.handler.ssl.
> SslHandler.decode(SslHandler.java:934)
>
> at org.apache.flink.shaded.netty4.io.netty.handler.codec.
> ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315)
>
> at org.apache.flink.shaded.netty4.io.netty.handler.codec.
> ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229)
>
> at 

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-13 Thread Gerard Garcia
Hi Zhijiang,

The problem is that no other task failed first. We have a task that
sometimes just stops processing data, and when we cancel it, we see the
logs messages  saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is
stuck in method:
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why
it stops processing data. I don;t see any increase in memory use in the
heap (I guess because these buffers are managed by Flink) so I'm not sure
if that is really the problem.

Gerard

On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Hi Gerard,
>
> I think you can check the job manager log to find which task failed at
> first, and then trace the task manager log containing the failed task to
> find the initial reason.
> The failed task will trigger canceling all the other tasks, and during
> canceling process, the blocked task that is waiting for output buffer can
> not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel
> process is not the key point and is in expectation. Maybe it did not cause
> OOM at all.
> If the taskduring canceling, the task manager process will be exited
> finally to trigger restarting the job.
>
> Zhijiang
>
> --
> 发件人:Gerard Garcia 
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 
> 抄 送:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Thanks Zhijiang,
>
> We haven't found any other relevant log messages anywhere. These traces
> belong to the unresponsive task, that is why we suspect that at some point
> it did not have enough memory to serialize the message and it blocked. I've
> also found that when it hanged several output buffers were full (see
> attached image buffers.outPoolUsage.png) so I guess the traces just reflect
> that.
>
> Probably the task hanged for some other reason and that is what filled the
> output buffers previous to the blocked operator. I'll have to continue
> investigating to find the real cause.
>
> Gerard
>
>
>
>
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
>  Hi Gerard,
>
> From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> --
> 发件人:gerardg 
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> (fixed formatting)
>
> Hello,
>
> We have experienced some problems where a task just hangs without showing
> any kind of log error while other tasks running in the same task manager
> continue without problems. When these tasks are restarted the task manager
> gets killed and shows several errors similar to these ones:
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> scala.collection.immutable.List.foreach(List.scala:392)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
> 

Re: External checkpoint metadata in Flink 1.5.x

2018-07-13 Thread Gyula Fóra
Hi,
To be fair my "workaround" is to restore the previous behaviour in our
Flink build as it only takes a few lines of code :)

To explain why this is convenient I can describe how we use the
checkpoints/savepoints.

For every Streaming Application (not flink job) we have a unique name from
which the savepoint directory is derived. This means that all savepoints +
external metadata pointers should end up in this directory and we can
easily pick out the latest for the current application when we want to
restore.

This is extremely convenient compared to crawling through a bunch of nested
directories to find the latest and also makes migrating checkpoint
directories and such things more straightforward.

Cheers,
Gyula

Aljoscha Krettek  ezt írta (időpont: 2018. júl. 13.,
P, 8:29):

> Out of curiosity, how will you work around it? And how is it easier for
> your tooling if checkpoints are in a central location?
>
> Best,
> Aljoscha
>
>
> On 12. Jul 2018, at 17:55, Gyula Fóra  wrote:
>
> Hi!
>
> Well it depends on how we look at it FLINK-5627
>   is not necessarily
> the current behaviour. You still can't really specify the exact location
> from within the job as it now goes to a checkpoint specific place
> determined by the checkpoint dir. I will close the jira as 9114 covers it
> in a more generic way.
>
> I can probably work around the current behaviour in the meantime :)
>
> Cheers,
> Gyula
>
> Aljoscha Krettek  ezt írta (időpont: 2018. júl. 12.,
> Cs, 17:35):
>
>> Hi,
>>
>> does that mean https://issues.apache.org/jira/browse/FLINK-5627 is no
>> longer relevant for you, since it seems to request the behaviour that we
>> have now?
>>
>> But yes, I think it's currently not possible (with out-of-box
>> functionality) to write externalized-checkpoint metadata to a central
>> location. There is this Jira issue which aims at implementing a solution:
>> https://issues.apache.org/jira/browse/FLINK-9114.
>>
>> I quickly talked to Stephan, it seems to be that the meta info about
>> externalized checkpoints is also written to the HA storage directory, maybe
>> that's helpful for you.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 12. Jul 2018, at 14:18, Gyula Fóra  wrote:
>>
>> Hi,
>> It seems that the behaviour to store the checkpoint metadata files for
>> externalized checkpoints changed from 1.4 to 1.5 and the docs seem to be
>> incorrectly saying that:
>> "state.checkpoints.dir: The target directory for meta data of externalized
>> checkpoints
>> 
>> "
>>
>> Now the external metadata files simply end up in the checkpoint dir as
>> configured in the state backend so basically the "state.checkpoints.dir"
>> param is overwritten by the job. This breaks a bunch of tooling around
>> checkpoints that rely on a common place for metadata files instead of
>> scattered around in subdirectories. I wonder is there any way to get the
>> previous behaviour back?
>>
>> Thanks,
>> Gyula
>>
>>
>>
>


Re: External checkpoint metadata in Flink 1.5.x

2018-07-13 Thread Aljoscha Krettek
Out of curiosity, how will you work around it? And how is it easier for your 
tooling if checkpoints are in a central location?

Best,
Aljoscha

> On 12. Jul 2018, at 17:55, Gyula Fóra  wrote:
> 
> Hi!
> 
> Well it depends on how we look at it FLINK-5627 
>   is not necessarily the 
> current behaviour. You still can't really specify the exact location from 
> within the job as it now goes to a checkpoint specific place determined by 
> the checkpoint dir. I will close the jira as 9114 covers it in a more generic 
> way.
> 
> I can probably work around the current behaviour in the meantime :)
> 
> Cheers,
> Gyula
> 
> Aljoscha Krettek mailto:aljos...@apache.org>> ezt írta 
> (időpont: 2018. júl. 12., Cs, 17:35):
> Hi,
> 
> does that mean https://issues.apache.org/jira/browse/FLINK-5627 
>  is no longer relevant for 
> you, since it seems to request the behaviour that we have now?
> 
> But yes, I think it's currently not possible (with out-of-box functionality) 
> to write externalized-checkpoint metadata to a central location. There is 
> this Jira issue which aims at implementing a solution: 
> https://issues.apache.org/jira/browse/FLINK-9114 
> .
> 
> I quickly talked to Stephan, it seems to be that the meta info about 
> externalized checkpoints is also written to the HA storage directory, maybe 
> that's helpful for you.
> 
> Best,
> Aljoscha
> 
> 
>> On 12. Jul 2018, at 14:18, Gyula Fóra > > wrote:
>> 
>> Hi,
>> It seems that the behaviour to store the checkpoint metadata files for 
>> externalized checkpoints changed from 1.4 to 1.5 and the docs seem to be 
>> incorrectly saying that: 
>> "state.checkpoints.dir: The target directory for meta data of externalized 
>> checkpoints 
>> "
>> 
>> Now the external metadata files simply end up in the checkpoint dir as 
>> configured in the state backend so basically the "state.checkpoints.dir" 
>> param is overwritten by the job. This breaks a bunch of tooling around 
>> checkpoints that rely on a common place for metadata files instead of 
>> scattered around in subdirectories. I wonder is there any way to get the 
>> previous behaviour back?
>> 
>> Thanks,
>> Gyula
>