Re: Checkpointing not happening in Standalone HA mode

2018-07-25 Thread vino yang
Hi Vinay:

Did you call specific config API refer to this documentation[1];

Can you share your job program and JM Log? Or the JM log contains the log
message like this pattern "Triggering checkpoint {} @ {} for job {}."?

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing

Thanks, vino.

2018-07-25 19:43 GMT+08:00 Chesnay Schepler :

> Can you provide us with the job code?
>
> I assume that checkpointing runs properly if you submit the same job to a
> normal cluster?
>
>
> On 25.07.2018 13:15, Vinay Patil wrote:
>
> No error in the logs. That is why I am not able to understand why
> checkpoints are not getting triggered.
>
> Regards,
> Vinay Patil
>
>
> On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil 
> wrote:
>
>> Hi Chesnay,
>>
>> No error in the logs. That is why I am not able to understand why
>> checkpoints are getting triggered.
>>
>> Regards,
>> Vinay Patil
>>
>>
>> On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
>> wrote:
>>
>>> Please check the job- and taskmanager logs for anything suspicious.
>>>
>>> On 25.07.2018 12:33, Vinay Patil wrote:
>>>
>>> Hi,
>>>
>>> I am starting the cluster using bootstrap application where in I am
>>> calling Job Manager and Task Manager main class to form the cluster. The HA
>>> cluster is formed correctly and I am able to submit jobs to this cluster
>>> using RemoteExecutionEnvironment but when I enable checkpointing in code I
>>> do not see any checkpoints triggered on Flink UI.
>>>
>>> Am I missing any configurations to be set for the
>>> RemoteExecutionEnvironment for checkpointing to work.
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>>
>>>
>


How to connect more than 2 hetrogenous Streams!!

2018-07-25 Thread Puneet Kinra
Hi

Is there a way to connect more than 2 streams with different stream schema

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
*

*e-mail :puneet.ki...@customercentria.com
*


Re: override jvm params

2018-07-25 Thread vino yang
Hi Cussac,

Flink on Yarn support dynamic properties. Can you try this :
-yD=?

The implementation is here[1][2].

[1]:
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L151
[2]:
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L198

Thanks, vino.


2018-07-25 20:27 GMT+08:00 Cussac, Franck :

> Hi Hequn,
>
>
>
> Thanks for your answer. I just tested and it doesn’t work.
>
>
>
> I’m using PureConfig to parse my conf files. With java I can override any
> argument using –D= syntax. How can I do same with flink in
> yarn mode ?
>
>
>
> Franck.
>
>
>
>
>
> *De :* Hequn Cheng [mailto:chenghe...@gmail.com]
> *Envoyé :* mercredi 25 juillet 2018 14:04
> *À :* Cussac, Franck 
> *Cc :* user 
> *Objet :* Re: override jvm params
>
>
>
> Hi Cussac,
>
> If I understand correctly, you want to pass rules.consumer.topic=test
> and rules.consumer.topic=test to flink jvm.
>
> I think you can try:
>
> flink run -m $HOSTPORT -yD rules.consumer.topic=test
> -yD rules.consumer.topic=test
>
>
>
> Hope this helps.
>
> Hequn
>
>
>
> On Wed, Jul 25, 2018 at 3:26 PM, Cussac, Franck <
> franck.cus...@ext.bleckwen.ai> wrote:
>
> Hi,
>
>
>
> Following the documentation I want to use –yD option to override some
> params in my conf like this :
>
>
>
> flink run -m $HOSTPORT -yD 
> "env.java.opts.taskmanager=-Drules.consumer.topic=test"
> -yD "env.java.opts.jobmanager=-Drules.consumer.topic=test" myjar mymain
>
>
>
> but it is just ignored. Nothing happend. But if I run with java on my IDE
> and :
>
> -Drules.consumer.topic=test
>
> in JVM’s parameter it works eprfectly.
>
>
>
> What do I have to do to override my params with yarn and flink ?
>
>
>
>
>
> Best regards,
>
> Franck Cussac.
>
>
>
>
>


Re: checkpoint always fails

2018-07-25 Thread vino yang
Hi Marvin,

Thanks for reporting this issue.

Can you share more details about the failed checkpoint, such as log,
exception stack trace, which statebackend used, HA configuration?

These information can help to trace the issue.

Thanks, vino.

2018-07-26 10:12 GMT+08:00 Marvin777 :

> Hi, all:
>
> flink job can run normally, but checkpoint always fails, like this:
> [image: image.png]
>
> [image: image.png]
> checkpoint configuration:
>
> [image: image.png]
>
> thanks.
>
>


Re: 答复: Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-25 Thread vino yang
Hi Youjun,

Thanks, you can try this but I am not sure if it works correctly. Because
for the REST Client, there are quite a few changes from 1.4 to 1.5.

Maybe you can customize the source code in 1.4 refer to specific
implementation of 1.5? Another option, upgrade your Flink version.

To Chesnay and Till:  any suggestion or opinion?

Thanks, vino.

2018-07-26 10:01 GMT+08:00 Yuan,Youjun :

> Thanks for the information. Forgot to mention, I am using Flink 1.4, the
> RestClusterClient seems don’t have the ability to retrieve the leader
> address. I did notice there is webMonitorRetrievalService member in Flink
> 1.5.
>
>
>
> I wonder if I can use RestClusterClient@v1.5 on my client side, to
> retrieve the leader JM of Flink v1.4 Cluster.
>
>
>
> Thanks
>
> Youjun
>
>
>
> *发件人**:* vino yang 
> *发送时间:* Wednesday, July 25, 2018 7:11 PM
> *收件人:* Martin Eden 
> *抄送:* Yuan,Youjun ; user@flink.apache.org
> *主题:* Re: Best way to find the current alive jobmanager with HA mode
> zookeeper
>
>
>
> Hi Martin,
>
>
>
>
>
> For a standalone cluster which exists multiple JM instances, If you do not
> use Rest API, but use Flink provided Cluster client. The client can
> perceive which one this the JM leader from multiple JM instances.
>
>
>
> For example, you can use CLI to submit flink job in a non-Leader node.
>
>
>
> But I did not verify this case for Flink on Mesos.
>
>
>
> Thanks, vino.
>
>
>
> 2018-07-25 17:22 GMT+08:00 Martin Eden :
>
> Hi,
>
>
>
> This is actually very relevant to us as well.
>
>
>
> We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of
> Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on
> another node by Marathon in case of failure and re-load it's state from
> Zookeeper.
>
>
>
> Yuan I am guessing you are using Flink in standalone mode and there it is
> actually running 3 instances of the Job Manager, 1 active and 2 stand-bys.
>
>
>
> Either way, in both cases there is the need to "discover" the hostname and
> port of the Job Manager at runtime. This is needed when you want to use the
> cli to submit jobs for instance. Is there an elegant mode to submit jobs
> other than say just trying out all the possible nodes in your cluster?
>
>
>
> Grateful if anyone could clarify any of the above, thanks,
>
> M
>
>
>
> On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun 
> wrote:
>
> Hi all,
>
>
>
> I have a standalone cluster with 3 jobmanagers, and set *high-availability
> to zookeeper*. Our client submits job by REST API(POST /jars/:jarid/run),
> which means we need to know the host of the any of the current alive
> jobmanagers. The problem is that, how can we know which job manager is
> alive, or the host of current leader?  We don’t want to access a dead JM.
>
>
>
> Thanks.
>
> Youjun Yuan
>
>
>
>
>


checkpoint always fails

2018-07-25 Thread Marvin777
Hi, all:

flink job can run normally, but checkpoint always fails, like this:
[image: image.png]

[image: image.png]
checkpoint configuration:

[image: image.png]

thanks.


答复: Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-25 Thread Yuan,Youjun
Thanks for the information. Forgot to mention, I am using Flink 1.4, the 
RestClusterClient seems don’t have the ability to retrieve the leader address. 
I did notice there is webMonitorRetrievalService member in Flink 1.5.

I wonder if I can use RestClusterClient@v1.5 on 
my client side, to retrieve the leader JM of Flink v1.4 Cluster.

Thanks
Youjun

发件人: vino yang 
发送时间: Wednesday, July 25, 2018 7:11 PM
收件人: Martin Eden 
抄送: Yuan,Youjun ; user@flink.apache.org
主题: Re: Best way to find the current alive jobmanager with HA mode zookeeper

Hi Martin,


For a standalone cluster which exists multiple JM instances, If you do not use 
Rest API, but use Flink provided Cluster client. The client can perceive which 
one this the JM leader from multiple JM instances.

For example, you can use CLI to submit flink job in a non-Leader node.

But I did not verify this case for Flink on Mesos.

Thanks, vino.

2018-07-25 17:22 GMT+08:00 Martin Eden 
mailto:martineden...@gmail.com>>:
Hi,

This is actually very relevant to us as well.

We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of 
Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on another 
node by Marathon in case of failure and re-load it's state from Zookeeper.

Yuan I am guessing you are using Flink in standalone mode and there it is 
actually running 3 instances of the Job Manager, 1 active and 2 stand-bys.

Either way, in both cases there is the need to "discover" the hostname and port 
of the Job Manager at runtime. This is needed when you want to use the cli to 
submit jobs for instance. Is there an elegant mode to submit jobs other than 
say just trying out all the possible nodes in your cluster?

Grateful if anyone could clarify any of the above, thanks,
M

On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun 
mailto:yuanyou...@baidu.com>> wrote:
Hi all,

I have a standalone cluster with 3 jobmanagers, and set high-availability to 
zookeeper. Our client submits job by REST API(POST /jars/:jarid/run), which 
means we need to know the host of the any of the current alive jobmanagers. The 
problem is that, how can we know which job manager is alive, or the host of 
current leader?  We don’t want to access a dead JM.

Thanks.
Youjun Yuan




Re: Implement Joins with Lookup Data

2018-07-25 Thread ashish pok
Hi Michael,
We are currently using 15 TMs with 4 cores and 4 slots each, 10GB of memory on 
each TM. We have 15 partitions on Kafka for stream and 6 for context/smaller 
stream. Heap is around 50%, GC is about 150ms and CPU loads are low. We may be 
able to reduce resources on this if need be. 
Thanks,


- Ashish

On Wednesday, July 25, 2018, 4:07 AM, Michael Gendelman  
wrote:

Hi Ashish,
We are planning for a similar use case and I was wondering if you can share the 
amount of resources you have allocated for this flow?
Thanks,Michael

On Tue, Jul 24, 2018, 18:57 ashish pok  wrote:

BTW, 
We got around bootstrap problem for similar use case using a “nohup” topic as 
input stream. Our CICD pipeline currently passes an initialize option to app IF 
there is a need to bootstrap and waits for X minutes before taking a savepoint 
and restart app normally listening to right topic(s). I believe there is work 
underway to handle this gracefully using Side Input as well. Other than 
determining X minutes for initialization to complete, we havent had any issue 
with this solution - we have over 40 million states refreshes daily and close 
to 200Mbps input streams being joined to states.
Hope this helps!


- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy  
wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not 
allow you to pause a source (the positions), so you can't fully consume the and 
preload the accounts or products to perform the join before the positions start 
flowing.  Additionally, Flink SQL does not support materializing an upset table 
for the accounts or products to perform the join, so yo have to develop your 
own KeyedProcessFunction, maintain the state, and perform the join on your own 
if you only want to join against the latest value for each key.
On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann  wrote:

Yes, using Kafka which you initialize with the initial values and then feed 
changes to the Kafka topic from which you consume could be a solution.
On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal 
 wrote:

Hi Till,
How would we do the initial hydration of the Product and Account data since 
it’s currently in a relational DB? Do we have to copy over data to Kafka and 
then use them? 
Regards,Harsh
On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:

Hi Harshvardhan,
I agree with Ankit that this problem could actually be solved quite elegantly 
with Flink's state. If you can ingest the product/account information changes 
as a stream, you can keep the latest version of it in Flink state by using a 
co-map function [1, 2]. One input of the co-map function would be the 
product/account update stream which updates the respective entries in Flink's 
state and the other input stream is the one to be enriched. When receiving 
input from this stream one would lookup the latest information contained in the 
operator's state and join it with the incoming event.
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
Cheers,Till
On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal 
 wrote:

Hi,
Thanks for your responses.
There is no fixed interval for the data being updated. It’s more like whenever 
you onboard a new product or there are any mandates that change will trigger 
the reference data to change.
It’s not just the enrichment we are doing here. Once we have enriched the data 
we will be performing a bunch of aggregations using the enriched data. 
Which approach would you recommend?
Regards,Harshvardhan
On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:


How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke 
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal 
Cc: 
Subject: Re: Implement Joins with Lookup Data

 

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

 

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal  
wrote:


Hi,

 

We are using Flink for financial data enrichment and aggregations. We have 
Positions data that we are currently receiving from Kafka. We want to enrich 
that data with reference data like Product and Account information that is 
present in a relational database. From my understanding of Flink so far I think 
t

Running a Python streaming job with Java dependencies

2018-07-25 Thread Joe Malt
Hi,

I'm trying to run a job with Flink's new Python streaming API but I'm
running into issues with Java imports.

I have a Jython project in IntelliJ with a lot of Java dependencies
configured through Maven. I can't figure out how to make Flink "see" these
dependencies.

An example script that exhibits the problem is the following (it's the
streaming example from the docs (
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html#streaming-program-example)
but with an extra import added)

from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FlatMapFunction,
ReduceFunction
from org.apache.flink.api.java.functions import KeySelector
from org.apache.flink.streaming.api.windowing.time.Time import milliseconds

# Added an extra import, this fails with an ImportError
import com.google.gson.GsonBuilder


class Generator(SourceFunction):
def __init__(self, num_iters):
self._running = True
self._num_iters = num_iters

# ... rest of the file is as in the documentation


This runs without any exceptions when run from IntelliJ (assuming
com.google.gson is added in the POM), but when I try to run it as a Flink
job with this command:

./pyflink-stream.sh ~/flink-python/MinimalExample.py - --local

it fails to find the dependency:

Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
  File "", line 1, in 
  File
"/var/folders/t1/gcltcjcn5zdgqfqrc32xk90x85xkg9/T/flink_streaming_plan_0bfab09c-baeb-414f-a718-01a5c71b3507/MinimalExample.py",
line 7, in 
ImportError: No module named google

How can I point pyflink-stream.sh to these Maven dependencies? I've tried
modifying the script to add my .m2/ directory to the classpath (using flink
run -C), but that didn't make any difference:

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

"$FLINK_BIN_DIR"/flink run -C "file:///Users/jmalt/.m2/" --class
org.apache.flink.streaming.python.api.PythonStreamBinder -v
"$FLINK_ROOT_DIR"/opt/flink-streaming-python*.jar "$@"


Thanks,

Joe Malt

Engineering Intern, Stream Processing
Yelp


Re: Questions on Unbounded number of keys

2018-07-25 Thread Chang Liu
Hi Till,

Thanks for your reply. But I think maybe I did not make my question clear. My 
question is not about whether the States within each keyed operator instances 
will run out of memory. My question is about, whether the unlimited keyed 
operator instances themselves will run out of memory.

So to reply to your answers, no matter using different State backends or 
regularly cleaning up the States (which is exactly what I am doing), it does 
not concern the number of keyed operator instances.

I would like to know:
Will the number of keyed operator instances (Java objects?) grow unbounded? 
If so, will they run out of memory? This is not actually related to the memory 
used by the keyed Stated inside.
If not, then how Flink is managing this multiple keyed operator instances?

I think this needs more knowledge about how Flink works internally to 
understand how keyed operator instances are created, maintained and destroyed. 
That’s why I would like your help understanding this.

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


> On 24 Jul 2018, at 14:31, Till Rohrmann  wrote:
> 
> Hi Chang Liu,
> 
> if you are dealing with an unlimited number of keys and keep state around for 
> every key, then your state size will keep growing with the number of keys. If 
> you are using the FileStateBackend which keeps state in memory, you will 
> eventually run into an OutOfMemoryException. One way to solve/mitigate this 
> problem is to use the RocksDBStateBackend which can go out of core.
> 
> Alternatively, you would need to clean up your state before you run out of 
> memory. One way to do this is to register for every key a timer which clears 
> the state. But this only works if you don't amass too much state data before 
> the timer is triggered. If you wish this solution is some kind of a poor 
> man's state TTL. The Flink community is currently developing a proper 
> implementation of it which does not rely on additional timers (which 
> increases the state footprint) [1].
> 
> [1] https://issues.apache.org/jira/browse/FLINK-9510 
> 
> 
> Cheers,
> Till
> 
> On Tue, Jul 24, 2018 at 10:11 AM Chang Liu  > wrote:
> Dear All,
> 
> I have questions regarding the keys. In general, the questions are:
> what happens if I am doing keyBy based on unlimited number of keys? How Flink 
> is managing each KeyedStream under the hood? Will I get memory overflow, for 
> example, if every KeyStream associated with a specific key is taking certain 
> amount of memory?
> BTW, I think it is fare to say that, I have to clear my KeyedState so that 
> the memory used by these State are cleaned up regularly. But still, I am 
> wondering, even though I am regularly cleaning up State memory, what happened 
> to memory used by the KeyedStream itself, if there is? And will they be 
> exploding?
> 
> Let me give an example for understanding it clearly.  Let’s say we have a
> 
>   val requestStream: DataStream[HttpRequest]
> 
> which is a stream of HTTP requests. And by using the session ID as the key, 
> we can obtain a KeyedStream per single session, as following:
> 
> val streamPerSession: KeyedStream[HttpRequest] = 
> requestStream.keyBy(_.sessionId)
> 
> However, the session IDs are actually a hashcode generated randomly by the 
> Web service/application, so that means, the number of sessions are unlimited 
> (which is reasonable, because every time a user open the application or 
> login, he/she will get a new unique session). 
> 
> Then, the question is: will Flink eventually run out of memory because the 
> number of sessions are unlimited (and because we are keying by the session 
> ID)?
> If so, how can we properly manage this situation?
> If not, could you help me understand WHY?
> Let’s also assume that, we are regularly clearing the KeyedState, so the 
> memory used by the State will not explode. 
> 
> 
> Many Thanks and Looking forward to your reply :)
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 



Regarding the use of JMX reporters for reporting custom (Gauge) metrics using notifications

2018-07-25 Thread Konstantinos Barmpis
I have been using the JMX reporter to gather data from a running flink
instance, more specifically to access a Gauge in one of my functions.

I am able to access this metric through:

JMXClientListener listener = new JMXClientListener();
JMXServiceURL url = new
JMXServiceURL("service:jmx:rmi:///jndi/rmi://:/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();

...
JmxGaugeMBean mbeanProxy = JMX.newMBeanProxy(mbsc, name,
JmxGaugeMBean.class, true);

System.out.println(((Collection) mbeanProxy.getValue()).size());
---

This works as expected, giving the relevant output for this metric, every
time it runs.

Since JMX supports the use of notifications, I was wondering if flink
metrics are able to support this, a simple trial of:

mbsc.addNotificationListener(name, listener, null, null);
failed with:

java.lang.IllegalArgumentException:
The specified MBean [bean info] is not a NotificationBroadcaster object.
which leads me to believe that Gauge metrics do not support this interface.

As it would be very beneficial for my program to use a push-based system
such as these notifications, as opposed to a pull-based approach (such as
the one in the code shown above), I would like the first make sure whether
flink supports JMX notifications or not, and if it does not to get insight
into the best approach for solving this issue, for example by extending the
JMX reporter in flink.

Thank you in advance for any insight provided into the matter,


-- 
Konstantinos Barmpis | Research Associate
White Rose Grid Enterprise Systems Group
Dept. of Computer Science
University of York
Tel: +44 (0) 1904-32 5653

Email Disclaimer:
http://www.york.ac.uk/docs/disclaimer/email.htm


Re: Flink 1.5 batch job fails to start

2018-07-25 Thread Alex Vinnik
Hi Vino,

Data is ok i double checked. Input is plain json and it can be processed by
same code compiled and run on 1.3.1 flink. Thanks for the hint about avro
and parquet versions. Got my fat jar synced up with flink 1.5.1
avro/parguet versions. Hope was high that it will help to resolve the
problem. And one run of the job actually was successful., but it started
failing after that with the same problem. Weird. Will continue to poke
around, feels I am so close :)

Best,
-Alex

On Tue, Jul 24, 2018 at 9:08 PM vino yang  wrote:

> Hi Alex,
>
> Is it possible that the data has been corrupted?
>
> Or have you confirmed that the avro version is consistent in different
> Flink versions?
>
> Also, if you don't upgrade Flink and still use version 1.3.1, can it be
> recovered?
>
> Thanks, vino.
>
>
> 2018-07-25 8:32 GMT+08:00 Alex Vinnik :
>
>> Vino,
>>
>> Upgraded flink to Hadoop 2.8.1
>>
>> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
>> entrypoint | grep 'Hadoop version'
>> 2018-07-25T00:19:46.142+
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>> version: 2.8.1
>>
>> but job still fails to start
>>
>> Ideas?
>>
>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>> d84cccd3bffcba1f243352a5e5ef99a9.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:169)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>> ... 21 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>> initialize task 'DataSink
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
>> Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:298)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:151)
>> ... 26 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>> ... 31 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>> at
>> java.io.ObjectInputStream.defaul

Re: Flink 1.5 batch job fails to start

2018-07-25 Thread Alex Vinnik
Hi Till,

Server start up entrypoint log

2018-07-25T12:19:12.268+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO

2018-07-25T12:19:12.271+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting
StandaloneSessionClusterEntrypoint (Version: , Rev:3488f8b,
Date:10.07.2018 @ 11:51:27 GMT)
2018-07-25T12:19:12.271+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current
user: flink
2018-07-25T12:19:18.599+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current
Hadoop/Kerberos user: flink
2018-07-25T12:19:18.607+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK
64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-25T12:19:18.607+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap
size: 1963 MiBytes
2018-07-25T12:19:18.607+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME:
/docker-java-home/jre
2018-07-25T12:19:18.615+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
version: 2.8.1
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  -Xms2048m
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  -Xmx2048m
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program
Arguments:
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--configDir
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
/opt/flink/conf
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--executionMode
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  cluster
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --host
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  cluster
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath:
/opt/flink/lib/flink-metrics-datadog-1.5.1.jar:/opt/flink/lib/flink-metrics-prometheus-1.5.1.jar:/opt/flink/lib/flink-python_2.11-1.5.1.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.5.1.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/peer-group-transform-all.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.1.jar:::
2018-07-25T12:19:18.616+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO

2018-07-25T12:19:18.620+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered
UNIX signal handlers for [TERM, HUP, INT]
2018-07-25T12:19:18.853+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting
StandaloneSessionClusterEntrypoint.
2018-07-25T12:19:18.854+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
default filesystem.
2018-07-25T12:19:19.045+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
security context.
2018-07-25T12:19:19.520+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing
cluster services.
2018-07-25T12:19:19.601+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to
start actor system at flink-jobmanager:6123
2018-07-25T12:19:24.768+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system
started at akka.tcp://flink@flink-jobmanager:6123

Below flink client log file
docker exec -it flink-jobmanager flink run /fat.jar --input-path
s3a://json-input --output-path s3a://parquet-output

2018-07-25 13:40:30,752 INFO  org.apache.flink.client.cli.CliFrontend
 -

2018-07-25 13:40:30,756 INFO  org.apache.flink.client.cli.CliFrontend
 -  S

Re: Re: downgrade Flink

2018-07-25 Thread Vino yang
Hi, You are welcome. Hope for enhancing Flink ML in the future.  Vino yang 
Thanks. On 2018-07-25 20:02 , Cederic Bosmans Wrote: Dear I was able to fix the 
problem. Thank you very much for your support! Kind regards  Cederic On Wed, 
Jul 25, 2018 at 1:57 PM Cederic Bosmans  wrote: Dear I 
followed your instructions but after clean installing again, Maven gives as 
error:  Could not resolve dependencies for project 
org.apache.flink:flink-examples-table_:jar:1.6-SNAPSHOT: Could not find 
artifact io.radicalbit:flink-jpmml-scala:jar:0.7.0-SNAPSHOT in apache.snapshots 
(https://repository.apache.org/snapshots) Do I need to define a repository? 
Kind regards Cederic On Wed, Jul 25, 2018 at 1:14 PM vino yang 
 wrote: Hi Cederic, Can you try to delete the specific 
dependency under this path : "/io/radicalbit/...", then let maven re-download 
jars? Thanks, vino. 2018-07-25 19:05 GMT+08:00 Cederic Bosmans 
: Dear Vino, Thank you for your quick response, you were 
indeed correct, I changed it to the specific version number and consequently, 
it does not give any error warnings anymore in my POM file. But unfortunately, 
my Intellij still does not want to import the library in my program. I tried to 
clean install it again using Maven but then it gives this error: Could not find 
artifact io.radicalbit:flink-jpmml-scala:jar:0.7.0-SNAPSHOT in apache.snapshots 
(https://repository.apache.org/snapshots).  which I can not solve. I really 
hope you can help me! Kind regards Cederic On Wed, Jul 25, 2018 at 12:58 PM 
vino yang  wrote: Hi Cederic, The README said the 
"latest" version is 1.3.2, it means as of that time. I think for the latest 
Flink version, there is no guarantee. I suggest replace the version's value 
from LATEST to specific version number. Thanks, vino. 2018-07-25 15:49 
GMT+08:00 Cederic Bosmans : Dear Vino If I understand you 
correctly, you mean that the jpmml should work on the latest 1.7-snapshot 
version of Flink? I inserted this in my POM.xml file: 
io.radicalbitflink-jpmml-scala  
  LATEST but it keeps saying: 'failed to read 
artifact descriptor for io.radicalbit:flink-jpmml-scala:jar:LATEST. So I can 
not import the functions in my project. Hopefully you can help me. Kind regards 
Cederic On Wed, Jul 25, 2018 at 4:16 AM vino yang  
wrote: Hi Cederic, I just read the project you gave, it includes the following 
statement in its README file. “flink-jpmml is tested with the latest Flink 
(i.e. 1.3.2), but any working Apache Flink version (repo) should work 
properly.” This project was born a year ago and should not rely on versions 
prior to Flink 1.0.  You can confirm it again. Thanks, vino. 2018-07-25 6:44 
GMT+08:00 Cederic Bosmans : Dear I am working on a 
streaming prediction model for which I want to try to use the flink-jpmml 
extension. (https://github.com/FlinkML/flink-jpmml) Unfortunately, it only 
supports only the 0.7.0-SNAPSHOT and 0.6.1 versions of Flink and I am using the 
1.7-SNAPSHOT version of Flink.  How can I downgrade my version? (the examples 
are written for sbt and I am using Maven) Thank you very much! Kind regards 
Cederic

Re: override jvm params

2018-07-25 Thread Hequn Cheng
Hi Cussac,
If I understand correctly, you want to pass rules.consumer.topic=test
and rules.consumer.topic=test
to flink jvm.
I think you can try:
flink run -m $HOSTPORT -yD rules.consumer.topic=test -yD
rules.consumer.topic=test

Hope this helps.
Hequn

On Wed, Jul 25, 2018 at 3:26 PM, Cussac, Franck <
franck.cus...@ext.bleckwen.ai> wrote:

> Hi,
>
>
>
> Following the documentation I want to use –yD option to override some
> params in my conf like this :
>
>
>
> flink run -m $HOSTPORT -yD 
> "env.java.opts.taskmanager=-Drules.consumer.topic=test"
> -yD "env.java.opts.jobmanager=-Drules.consumer.topic=test" myjar mymain
>
>
>
> but it is just ignored. Nothing happend. But if I run with java on my IDE
> and :
>
> -Drules.consumer.topic=test
>
> in JVM’s parameter it works eprfectly.
>
>
>
> What do I have to do to override my params with yarn and flink ?
>
>
>
>
>
> Best regards,
>
> Franck Cussac.
>
>
>


Re: Checkpointing not happening in Standalone HA mode

2018-07-25 Thread Vinay Patil
Hi Chesnay,

No error in the logs. That is why I am not able to understand why
checkpoints are getting triggered.

Regards,
Vinay Patil


On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler  wrote:

> Please check the job- and taskmanager logs for anything suspicious.
>
> On 25.07.2018 12:33, Vinay Patil wrote:
>
> Hi,
>
> I am starting the cluster using bootstrap application where in I am
> calling Job Manager and Task Manager main class to form the cluster. The HA
> cluster is formed correctly and I am able to submit jobs to this cluster
> using RemoteExecutionEnvironment but when I enable checkpointing in code I
> do not see any checkpoints triggered on Flink UI.
>
> Am I missing any configurations to be set for the
> RemoteExecutionEnvironment for checkpointing to work.
>
>
> Regards,
> Vinay Patil
>
>
>


RE: override jvm params

2018-07-25 Thread Cussac, Franck
Hi Hequn,

Thanks for your answer. I just tested and it doesn’t work.

I’m using PureConfig to parse my conf files. With java I can override any 
argument using –D= syntax. How can I do same with flink in yarn 
mode ?

Franck.


De : Hequn Cheng [mailto:chenghe...@gmail.com]
Envoyé : mercredi 25 juillet 2018 14:04
À : Cussac, Franck 
Cc : user 
Objet : Re: override jvm params

Hi Cussac,
If I understand correctly, you want to pass rules.consumer.topic=test and 
rules.consumer.topic=test to flink jvm.
I think you can try:
flink run -m $HOSTPORT -yD rules.consumer.topic=test -yD 
rules.consumer.topic=test

Hope this helps.
Hequn

On Wed, Jul 25, 2018 at 3:26 PM, Cussac, Franck 
mailto:franck.cus...@ext.bleckwen.ai>> wrote:
Hi,

Following the documentation I want to use –yD option to override some params in 
my conf like this :

flink run -m $HOSTPORT -yD 
"env.java.opts.taskmanager=-Drules.consumer.topic=test" -yD 
"env.java.opts.jobmanager=-Drules.consumer.topic=test" myjar mymain

but it is just ignored. Nothing happend. But if I run with java on my IDE and :
-Drules.consumer.topic=test
in JVM’s parameter it works eprfectly.

What do I have to do to override my params with yarn and flink ?


Best regards,
Franck Cussac.




"Futures timed out" when trying to cancel a job with savepoint

2018-07-25 Thread Julio Biason
Hey guys,

We just built a brand new Flink 1.4.0 cluster with HA and everything seems
to be working fine, but we are getting some errors with savepoints.

For example, I have a running job

-- Running/Restarting Jobs ---
25.07.2018 11:55:18 : e5280bad25a7f19122f98483f94aba26 : Mr Banks (RUNNING)
--

If I try to create a savepoint with

flink savepoint e5280bad25a7f19122f98483f94aba26

The command just stays there and never returns (I waited about 10 minutes,
with no response). Then I tried to cancel with savepoint:

flink cancel e5280bad25a7f19122f98483f94aba26 -s

And I got a

java.util.concurrent.TimeoutException: Futures timed out after [6
milliseconds]

I checked the jobmanager logs, but I can't see any problems; I checked the
Hadoop logs for any errors (believing the problem may be in the underlying
system), but it seems it did create the nodes properly -- at least, there
are no errors there too.

Is there anything else I should check?

PS: My state is not that big (my napkin calculations say it's less than
1Gb) so it doesn't seem it's a problem with the state size taking too long
to be saved.

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Re: job status monitor

2018-07-25 Thread Renjie Liu
You can use rest api here
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/rest_api.html

On Wed, Jul 25, 2018 at 5:18 PM 从六品州同 <26304...@qq.com> wrote:

> dear all:
> Is there a notification mechanism for Flink? When job's status changes,
> such as restart, failure, notify other systems?
> Or
> I have a system to monitor the job state of Flink. What should I do?
>
> thanks
>
-- 
Liu, Renjie
Software Engineer, MVAD


Re: Checkpointing not happening in Standalone HA mode

2018-07-25 Thread Chesnay Schepler

Can you provide us with the job code?

I assume that checkpointing runs properly if you submit the same job to 
a normal cluster?


On 25.07.2018 13:15, Vinay Patil wrote:
No error in the logs. That is why I am not able to understand why 
checkpoints are not getting triggered.


Regards,
Vinay Patil


On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil > wrote:


Hi Chesnay,

No error in the logs. That is why I am not able to understand why
checkpoints are getting triggered.

Regards,
Vinay Patil


On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Please check the job- and taskmanager logs for anything
suspicious.

On 25.07.2018 12:33, Vinay Patil wrote:

Hi,

I am starting the cluster using bootstrap application where
in I am calling Job Manager and Task Manager main class to
form the cluster. The HA cluster is formed correctly and I am
able to submit jobs to this cluster using
RemoteExecutionEnvironment but when I enable checkpointing in
code I do not see any checkpoints triggered on Flink UI.

Am I missing any configurations to be set for the
RemoteExecutionEnvironment for checkpointing to work.


Regards,
Vinay Patil







Re: Checkpointing not happening in Standalone HA mode

2018-07-25 Thread Vinay Patil
No error in the logs. That is why I am not able to understand why
checkpoints are not getting triggered.

Regards,
Vinay Patil


On Wed, Jul 25, 2018 at 4:44 PM Vinay Patil  wrote:

> Hi Chesnay,
>
> No error in the logs. That is why I am not able to understand why
> checkpoints are getting triggered.
>
> Regards,
> Vinay Patil
>
>
> On Wed, Jul 25, 2018 at 4:36 PM Chesnay Schepler 
> wrote:
>
>> Please check the job- and taskmanager logs for anything suspicious.
>>
>> On 25.07.2018 12:33, Vinay Patil wrote:
>>
>> Hi,
>>
>> I am starting the cluster using bootstrap application where in I am
>> calling Job Manager and Task Manager main class to form the cluster. The HA
>> cluster is formed correctly and I am able to submit jobs to this cluster
>> using RemoteExecutionEnvironment but when I enable checkpointing in code I
>> do not see any checkpoints triggered on Flink UI.
>>
>> Am I missing any configurations to be set for the
>> RemoteExecutionEnvironment for checkpointing to work.
>>
>>
>> Regards,
>> Vinay Patil
>>
>>
>>


Re: Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-25 Thread vino yang
Hi Martin,


For a standalone cluster which exists multiple JM instances, If you do not
use Rest API, but use Flink provided Cluster client. The client can
perceive which one this the JM leader from multiple JM instances.

For example, you can use CLI to submit flink job in a non-Leader node.

But I did not verify this case for Flink on Mesos.

Thanks, vino.

2018-07-25 17:22 GMT+08:00 Martin Eden :

> Hi,
>
> This is actually very relevant to us as well.
>
> We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of
> Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on
> another node by Marathon in case of failure and re-load it's state from
> Zookeeper.
>
> Yuan I am guessing you are using Flink in standalone mode and there it is
> actually running 3 instances of the Job Manager, 1 active and 2 stand-bys.
>
> Either way, in both cases there is the need to "discover" the hostname and
> port of the Job Manager at runtime. This is needed when you want to use the
> cli to submit jobs for instance. Is there an elegant mode to submit jobs
> other than say just trying out all the possible nodes in your cluster?
>
> Grateful if anyone could clarify any of the above, thanks,
> M
>
> On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun 
> wrote:
>
>> Hi all,
>>
>>
>>
>> I have a standalone cluster with 3 jobmanagers, and set *high-availability
>> to zookeeper*. Our client submits job by REST API(POST
>> /jars/:jarid/run), which means we need to know the host of the any of the
>> current alive jobmanagers. The problem is that, how can we know which job
>> manager is alive, or the host of current leader?  We don’t want to access a
>> dead JM.
>>
>>
>>
>> Thanks.
>>
>> Youjun Yuan
>>
>
>


Re: downgrade Flink

2018-07-25 Thread vino yang
Hi Cederic,

The README said the "*latest*" version is 1.3.2, it means as of that time.
I think for the latest Flink version, there is no guarantee.

I suggest replace the version's value from LATEST to specific version
number.

Thanks, vino.

2018-07-25 15:49 GMT+08:00 Cederic Bosmans :

> Dear Vino
>
> If I understand you correctly, you mean that the jpmml should work on the
> latest 1.7-snapshot version of Flink?
> I inserted this in my POM.xml file:
>
> 
>io.radicalbit
>flink-jpmml-scala
>LATEST
> 
>
> but it keeps saying: 'failed to read artifact descriptor for
> io.radicalbit:flink-jpmml-scala:jar:LATEST.
> So I can not import the functions in my project.
>
> Hopefully you can help me.
> Kind regards
> Cederic
>
> On Wed, Jul 25, 2018 at 4:16 AM vino yang  wrote:
>
>> Hi Cederic,
>>
>> I just read the project you gave, it includes the following statement in
>> its README file.
>>
>>
>> *“flink-jpmml is tested with the latest Flink (i.e. 1.3.2), but any
>> working Apache Flink version (repo) should work properly.”*
>>
>>
>> This project was born a year ago and should not rely on versions prior to
>> Flink 1.0.
>>
>> You can confirm it again.
>>
>> Thanks, vino.
>>
>>
>> 2018-07-25 6:44 GMT+08:00 Cederic Bosmans :
>>
>>> Dear
>>>
>>> I am working on a streaming prediction model for which I want to try to
>>> use the flink-jpmml extension. (https://github.com/FlinkML/flink-jpmml)
>>> Unfortunately, it only supports only the 0.7.0-SNAPSHOT and 0.6.1
>>> versions of Flink and I am using the 1.7-SNAPSHOT version of Flink.
>>> How can I downgrade my version?
>>> (the examples are written for sbt and I am using Maven)
>>> Thank you very much!
>>>
>>> Kind regards
>>> Cederic
>>>
>>>
>>


Re: Checkpointing not happening in Standalone HA mode

2018-07-25 Thread Chesnay Schepler

Please check the job- and taskmanager logs for anything suspicious.

On 25.07.2018 12:33, Vinay Patil wrote:

Hi,

I am starting the cluster using bootstrap application where in I am 
calling Job Manager and Task Manager main class to form the cluster. 
The HA cluster is formed correctly and I am able to submit jobs to 
this cluster using RemoteExecutionEnvironment but when I enable 
checkpointing in code I do not see any checkpoints triggered on Flink UI.


Am I missing any configurations to be set for the 
RemoteExecutionEnvironment for checkpointing to work.



Regards,
Vinay Patil





Checkpointing not happening in Standalone HA mode

2018-07-25 Thread Vinay Patil
Hi,

I am starting the cluster using bootstrap application where in I am calling
Job Manager and Task Manager main class to form the cluster. The HA cluster
is formed correctly and I am able to submit jobs to this cluster using
RemoteExecutionEnvironment but when I enable checkpointing in code I do not
see any checkpoints triggered on Flink UI.

Am I missing any configurations to be set for the
RemoteExecutionEnvironment for checkpointing to work.


Regards,
Vinay Patil


Re: Query regarding rest.port property

2018-07-25 Thread Vinay Patil
Thanks Chesnay for your inputs.

I am actually starting the cluster using bootstrap application where in I
am calling Job Manager and Task Manager main class to form the cluster.

So I have removed flink-runtime-web dependency and used only flink_runtime
dependency for forming the cluster , but still not able to hit the rest
api's, Is there anything else I can do here ?

Yes, you are right about separating the API's into two parts.

Regards,
Vinay Patil


On Sat, Jul 21, 2018 at 1:46 AM Chesnay Schepler  wrote:

> Something that I was thinking about a while ago was to separate the REST
> API into 2 parts;
> one for monitoring (getting details for a job etc.),
> one for modifying state (submitting/canceling jobs, uploading jars, etc.)
>
> This may better fit your requirements.
>
> On 20.07.2018 22:13, Chesnay Schepler wrote:
>
> Effectively you can't disable them selectively; reason being that they are
> actually one and the same.
> The ultimate solution is to build flink-dist yourself, and exclude
> "flink-runtime-web" from it, which removes
> the required files.
>
> Note that being able to selectively disable them _for security reasons_
> wouldn't do you much good as far as I can tell; if you have access to the
> REST API you can do anything the UI does. Similarly, if you can restrict
> access to the REST API, you also do that for the UI.
>
> On 20.07.2018 18:14, Vino yang wrote:
>
> Hi Vinay,
>
> Did job manager run in node "myhost"? Did you check the port you specified
> open for remote access?
>
> Can you try to start web UI, but just forbid its port?
>
> 
> Vino yang
> Thanks.
>
>
> On 2018-07-20 22:48 , Vinay Patil  Wrote:
>
> Hi,
>
> We have disabled Flink Web UI for security reasons however we want to use
> REST Api for monitoring purpose. For that I have set jobmanager.web.port =
> -1 , rest.port=, rest.address=myhost
>
> But I am not able to access any REST api using https://
> myhost:/
>
> Is it mandatory to have Flink Web UI running or am I missing any
> configuration ?
>
> Regards,
> Vinay Patil
>
>
>
>


Re: S3 file source - continuous monitoring - many files missed

2018-07-25 Thread Fabian Hueske
Hi,

First of all, the ticket reports a bug (or improvement or feature
suggestion) such that others are aware of the problem and understand its
cause.

At some point it might be picked up and implemented. In general, there is
no guarantee whether or when this happens, but the Flink community is of
course eager to fix bugs and it's a rather important problem.
So it might be addressed soon. However, I cannot promise for which release
it will be fixed.

You can of course help the community (and yourself) and contribute a fix
for this problem.
Scratching your own itch is a good way to get started in contributing to
open source projects ;-).

Best, Fabian


2018-07-25 10:23 GMT+02:00 Averell :

> Thank you Fabian for the guide to implement the fix.
>
> I'm not quite clear about the best practice of creating JIRA ticket. I
> modified its priority to Major as you said that it is important.
> What would happen next with that issue then? Someone (anyone) will pick it
> and create a fix, then include that in the following release?
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


override jvm params

2018-07-25 Thread Cussac, Franck
Hi,

Following the documentation I want to use -yD option to override some params in 
my conf like this :

flink run -m $HOSTPORT -yD 
"env.java.opts.taskmanager=-Drules.consumer.topic=test" -yD 
"env.java.opts.jobmanager=-Drules.consumer.topic=test" myjar mymain

but it is just ignored. Nothing happend. But if I run with java on my IDE and :
-Drules.consumer.topic=test
in JVM's parameter it works eprfectly.

What do I have to do to override my params with yarn and flink ?


Best regards,
Franck Cussac.



Re: Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-25 Thread Martin Eden
Hi,

This is actually very relevant to us as well.

We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of
Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on
another node by Marathon in case of failure and re-load it's state from
Zookeeper.

Yuan I am guessing you are using Flink in standalone mode and there it is
actually running 3 instances of the Job Manager, 1 active and 2 stand-bys.

Either way, in both cases there is the need to "discover" the hostname and
port of the Job Manager at runtime. This is needed when you want to use the
cli to submit jobs for instance. Is there an elegant mode to submit jobs
other than say just trying out all the possible nodes in your cluster?

Grateful if anyone could clarify any of the above, thanks,
M

On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun  wrote:

> Hi all,
>
>
>
> I have a standalone cluster with 3 jobmanagers, and set *high-availability
> to zookeeper*. Our client submits job by REST API(POST /jars/:jarid/run),
> which means we need to know the host of the any of the current alive
> jobmanagers. The problem is that, how can we know which job manager is
> alive, or the host of current leader?  We don’t want to access a dead JM.
>
>
>
> Thanks.
>
> Youjun Yuan
>


Re: job status monitor

2018-07-25 Thread Chesnay Schepler

There's no built-in mechanism to send notifications.

To monitor the job status you can poll the REST API. (*/jobs/:jobid)

*Alternatively you could implement a MetricReporter that explicitly 
checks for the availability metrics 
,

regularly polls them and sends out notifications for irregular values.

For example if the upTime metric has decreased this always means 
something went wrong.


On 25.07.2018 09:52, ?? wrote:

dear all??
Is there a notification mechanism for Flink? When job's status 
changes, such as restart, failure, notify other systems??

Or
I have a system to monitor the job state of Flink. What should I do?

thanks





RocksDB state backend Checkpointing Failed

2018-07-25 Thread Marvin777
Hi,all:

Checkpoint always fails, like this:
https://jira.apache.org/jira/browse/FLINK-9945

[image: image.png]


thanks.


job status monitor

2018-07-25 Thread ??????????
dear all??Is there a notification mechanism for Flink? When job's status 
changes, such as restart, failure, notify other systems??
Or
I have a system to monitor the job state of Flink. What should I do?



thanks

Re: S3 file source - continuous monitoring - many files missed

2018-07-25 Thread Averell
Thank you Fabian for the guide to implement the fix.

I'm not quite clear about the best practice of creating JIRA ticket. I
modified its priority to Major as you said that it is important.
What would happen next with that issue then? Someone (anyone) will pick it
and create a fix, then include that in the following release?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-25 Thread Fabian Hueske
Hi,

Thanks for creating the Jira issue.
I'm not sure if I would consider this a blocker but it is certainly an
important problem to fix.

Anyway, in the original version Flink checkpoints the modification
timestamp up to which all files have been read (or at least up to which
point it *thinks* to have everything processed in case of S3).
In case of a recovery, the timestamp is reset to the checkpointed value and
all files with a larger mod timestamp are processed again. This reset of
the read position together with resetting the state of all operators
results in exactly-once state consistency.

In order to avoid that the data of files is added twice to the state of an
operator, an the monitoring sink must ensure that it does not read data
again that was processed before the checkpoint was committed.
So, if you add an offset to the mod timestamp and track processed files
with a ts larger than the checkpointed mod timestamp by file name, these
names must be included in the checkpoint as well.

Best, Fabian


2018-07-25 6:34 GMT+02:00 Averell :

> Hello Fabian,
>
> I created the JIRA bug https://issues.apache.org/jira/browse/FLINK-9940
> BTW, I have one more question: Is it worth to checkpoint that list of
> processed files? Does the current implementation of file-source guarantee
> exactly-once?
>
> Thanks for your support.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Implement Joins with Lookup Data

2018-07-25 Thread Michael Gendelman
Hi Ashish,

We are planning for a similar use case and I was wondering if you can share
the amount of resources you have allocated for this flow?

Thanks,
Michael

On Tue, Jul 24, 2018, 18:57 ashish pok  wrote:

> BTW,
>
> We got around bootstrap problem for similar use case using a “nohup” topic
> as input stream. Our CICD pipeline currently passes an initialize option to
> app IF there is a need to bootstrap and waits for X minutes before taking a
> savepoint and restart app normally listening to right topic(s). I believe
> there is work underway to handle this gracefully using Side Input as well.
> Other than determining X minutes for initialization to complete, we havent
> had any issue with this solution - we have over 40 million states refreshes
> daily and close to 200Mbps input streams being joined to states.
>
> Hope this helps!
>
>
>
> - Ashish
>
> On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <
> fearsome.lucid...@gmail.com> wrote:
>
> Alas, this suffer from the bootstrap problem.  At the moment Flink does
> not allow you to pause a source (the positions), so you can't fully consume
> the and preload the accounts or products to perform the join before the
> positions start flowing.  Additionally, Flink SQL does not support
> materializing an upset table for the accounts or products to perform the
> join, so yo have to develop your own KeyedProcessFunction, maintain the
> state, and perform the join on your own if you only want to join against
> the latest value for each key.
>
> On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann 
> wrote:
>
> Yes, using Kafka which you initialize with the initial values and then
> feed changes to the Kafka topic from which you consume could be a solution.
>
> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi Till,
>
> How would we do the initial hydration of the Product and Account data
> since it’s currently in a relational DB? Do we have to copy over data to
> Kafka and then use them?
>
> Regards,
> Harsh
>
> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>
> Hi Harshvardhan,
>
> I agree with Ankit that this problem could actually be solved quite
> elegantly with Flink's state. If you can ingest the product/account
> information changes as a stream, you can keep the latest version of it in
> Flink state by using a co-map function [1, 2]. One input of the co-map
> function would be the product/account update stream which updates the
> respective entries in Flink's state and the other input stream is the one
> to be enriched. When receiving input from this stream one would lookup the
> latest information contained in the operator's state and join it with the
> incoming event.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
> Thanks for your responses.
>
> There is no fixed interval for the data being updated. It’s more like
> whenever you onboard a new product or there are any mandates that change
> will trigger the reference data to change.
>
> It’s not just the enrichment we are doing here. Once we have enriched the
> data we will be performing a bunch of aggregations using the enriched data.
>
> Which approach would you recommend?
>
> Regards,
> Harshvardhan
>
> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:
>
> How often is the product db updated? Based on that you can store product
> metadata as state in Flink, maybe setup the state on cluster startup and
> then update daily etc.
>
>
>
> Also, just based on this feature, flink doesn’t seem to add a lot of value
> on top of Kafka. As Jorn said below, you can very well store all the events
> in an external store and then periodically run a cron to enrich later since
> your processing doesn’t seem to require absolute real time.
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Jörn Franke 
> *Date: *Monday, July 23, 2018 at 10:10 PM
> *To: *Harshvardhan Agrawal 
> *Cc: *
> *Subject: *Re: Implement Joins with Lookup Data
>
>
>
> For the first one (lookup of single entries) you could use a NoSQL db (eg
> key value store) - a relational database will not scale.
>
>
>
> Depending on when you need to do the enrichment you could also first store
> the data and enrich it later as part of a batch process.
>
>
> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
>
>
> We are using Flink for financial data enrichment and aggregations. We have
> Positions data that we are currently receiving from Kafka. We want to
> enrich that data with reference data like Product and Account information
> that is present in a relational database. From my understanding of Flink so
> far I think there are two ways to achieve this. Here are two ways 

Re: Flink 1.5 batch job fails to start

2018-07-25 Thread Till Rohrmann
Hi Alex,

could you share with us the full logs of the client and the cluster
entrypoint? That would be tremendously helpful.

Cheers,
Till

On Wed, Jul 25, 2018 at 4:08 AM vino yang  wrote:

> Hi Alex,
>
> Is it possible that the data has been corrupted?
>
> Or have you confirmed that the avro version is consistent in different
> Flink versions?
>
> Also, if you don't upgrade Flink and still use version 1.3.1, can it be
> recovered?
>
> Thanks, vino.
>
>
> 2018-07-25 8:32 GMT+08:00 Alex Vinnik :
>
>> Vino,
>>
>> Upgraded flink to Hadoop 2.8.1
>>
>> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
>> entrypoint | grep 'Hadoop version'
>> 2018-07-25T00:19:46.142+
>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
>> version: 2.8.1
>>
>> but job still fails to start
>>
>> Ideas?
>>
>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
>> d84cccd3bffcba1f243352a5e5ef99a9.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:169)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
>> ... 21 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
>> initialize task 'DataSink
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
>> Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
>> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:298)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:151)
>> ... 26 more
>> Caused by: java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
>> failed: unread block data
>> at
>> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
>> ... 31 more
>> Caused by: java.lang.IllegalStateException: unread block data
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431