Re: How to migrate Kafka Producer ?

2018-12-19 Thread Dawid Wysakowicz
Hi Edward,

AFAIK we do not support migrating state from one connector to another
one, which is in fact the case for kafka 0.11 and the "universal" one.

You might try to use the project bravo[1] to migrate the state manually,
but unfortunately you have to understand the internals of both of the
connectors. I pull also Piotr to the thread, maybe he can provide more
straightforward workaround.

Best,

Dawid

[1] https://github.com/king/bravo

On 18/12/2018 14:33, Edward Rojas wrote:
> Hi,
>
> I'm planning to migrate from kafka connector 0.11 to the new universal kafka
> connector 1.0.0+ but I'm having some troubles.
>
> The kafka consumer seems to be compatible but when trying to migrate the
> kafka producer I get an incompatibility error for the state migration. 
> It looks like the producer uses a list state of type
> "NextTransactionalIdHint", but this class is specific for each Producer
> (FlinkKafkaProducer011.NextTransactionalIdHint  vs
> FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are not
> compatible.
>
>
> I would like to know what is the recommended way to perform this kind of
> migration without losing the state ?
>
> Thanks in advance,
> Edward
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: OpenPGP digital signature


Re: Kafka consumer, is there a way to filter out messages using key only?

2018-12-19 Thread Dawid Wysakowicz
Hi,

I'm afraid that there is no out-of-the box solution for this, but what
you could do is to generate from KeyedDeserializationSchema some marker
(Optional, null value...) based on the message key, that would allow you
later to filter it out. So assuming the Optional solution the result of
KeyedDeserializationSchema#deserialize could be Optional.empty() for
invalid keys and Optional.of(deserializedValue) for valid keys.

Best,

Dawid

On 18/12/2018 20:22, Hao Sun wrote:
> Hi, I am using 1.7 on K8S.
>
> I have a huge amount of data in kafka, but I only need a tiny portion
> of it.
> It is a keyed stream, the value in JSON encoded. I want to avoid
> deserialization of the value, since it is very expensive. Can I only
> filter based on the key?
> I know there is a KeyedDeserializationSchema, but can I use it to
> filter data?
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103


signature.asc
Description: OpenPGP digital signature


Re: Custom Metrics in Windowed AggregateFunction

2018-12-19 Thread Dawid Wysakowicz
Hi Chirag,

I am afraid you are right you cannot access metrics from within
AggregateFunction in WindowedStream. You can though use rich variant of
WindowFunction, which is invoked for every window with the results of
AggregateFunction. Would that be enough for your use case to use
.aggregate(aggregateFunction, windowFunction) and register metrics in
the windowFunction?

Best,

Dawid

On 19/12/2018 04:30, Chirag Dewan wrote:
> Hi,
>
> I am writing a Flink job for aggregating events in a window. 
>
> I am trying to use the /AggregateFunction/ implementation for this. 
>
> Now, since WindowedStream does not allow a RichAggregateFunction for
> aggregation, I cant use the RuntimeContext to get the Metric group. 
>
> I dont even see any other way of accessing the Metric Group in a
> non-rich function implementation?
>
> Is there any way around here? 
>
> Any help appreciated.
>
> Thanks,
>
> Chirag


signature.asc
Description: OpenPGP digital signature


Re: problem submitting job, it hangs there

2018-12-19 Thread Chang Liu
Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅


> On 14 Dec 2018, at 11:09, Tzu-Li Chen  wrote:
> 
> Hi Chang,
> 
> I think there is a JIRA[1] aimed at harden this case.
> 
> In fact Flink create this directory on started and without other warnings,
> we can assume that it has been created. So it might be deleted by
> some clean up processes(by Flink or by the fs).
> 
> Best,
> tison.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-11151 
> 
> 
> Chang Liu mailto:fluency...@gmail.com>> 于2018年12月14日周五 
> 下午5:28写道:
> My question is: whatever the Flink user is doing, as long as he/her is doing 
> all the actions within the Flink-provided ways (Flink CLI or Flink APIs in 
> code), should not be able to touch this directory, right?
> 
> Because this directory is for the JobManager and managed by Flink.
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 14 Dec 2018, at 10:23, Chang Liu > > wrote:
>> 
>> Hi Chesnay,
>> 
>> What do you mean by "...we can make a small adjustment to the code…"? Do you 
>> mean I, as a flink application developer, can do this in my code, OR, it has 
>> to be a code change in the Flink itself?
>> 
>> And more importantly, I would like to ping point the root cause of this 
>> because I cannot just manually create such directory in Production.
>> 
>> Many thanks :)
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 13 Dec 2018, at 14:51, Chesnay Schepler >> > wrote:
>>> 
>>> The directory is automatically created when Flink is started; maybe it was 
>>> deleted by some cleanup process?
>>> 
>>> In any case we can make a small adjustment to the code to create all 
>>> required directories when they don't exist.
>>> 
>>> On 13.12.2018 14:46, Chang Liu wrote:
 Dear All,
 
 I did a workaround and the job submitting is working. I manually created 
 the directory flink-web-upload under the directory 
 /tmp/flink-web-ec768ff6-1db1-4afa-885f-b2828bc31127 .
 
 But I don’t think this is the proper solution. Flink should be able to 
 create such directory automatically. Any ideas? Many Thanks.
 
 Best regards/祝好,
 
 Chang Liu 刘畅
 
 
> On 13 Dec 2018, at 12:01, Chang Liu  > wrote:
> 
> Dear all,
> 
> I am trying to submit a job but it got stuck here:
> 
> ...
> 2018-12-13 10:43:11,476 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.heap.size, 1024m
> 2018-12-13 10:43:11,476 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.heap.size, 1024m
> 2018-12-13 10:43:11,476 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-12-13 10:43:11,476 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: parallelism.default, 1
> 2018-12-13 10:43:11,480 INFO  
> org.apache.flink.client.program.rest.RestClusterClient- 
> Submitting job 7d3078bfc22b225351a3178e3e6be992 (detached: true).
> 
> 
> And I got this:
> 
> ...
> 2018-12-13 10:43:12,292 WARN  
> org.apache.flink.runtime.rest.FileUploadHandler   - File 
> upload failed.
> java.nio.file.NoSuchFileException: 
> c/flink-web-upload/9cdc0d1e-7610-4d50-a665-a614fc8d75e9
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at 
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
> at java.nio.file.Files.createDirectory(Files.java:674)
> at 
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:105)
> at 
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:67)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.Co

question on Flink memory management

2018-12-19 Thread Chang Liu
Dear All,

I am trying to figure how Flink is managing the memory. I found this link:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 





Question 1 - If, in my code, I defined two objects, one is native Java HashMap, 
one is the Flink MapState, which part of the JVM heap, as mentioned in above 
figure, each of them is put?

Are both Java HashMap and Flink MapState are put in Free or Memory Manager?


Question 2 - As it is stated in the doc, TaskManager allocates the memory on 
start-up time (lets say 1024MB). Does that mean, if I check the utilization of 
the memory on the machine (i.e., via top), these allocated memory is in used? 
It does not matter whether I have jobs running on the TaskManager?

Lets say, if I start or stop a simple job, the memory usage will the same (if 
there is not off-heap memory used or there is no OOM)?


Many Thanks.

Best regards/祝好,

Chang Liu 刘畅




Re: question on Flink memory management

2018-12-19 Thread Dawid Wysakowicz
Hi Chang,

The link you've posted describes only the Batch execution. It does not
describe the streaming (which I think is what you use, as only there you
have access to flink's state). It is also quite old and therefore I am
not 100% sure how up-to-date it is.

Best,

Dawid

On 19/12/2018 12:11, Chang Liu wrote:
> Dear All,
>
> I am trying to figure how Flink is managing the memory. I found this link:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
>
>
>
> /Question 1/ - If, in my code, I defined two objects, one is native
> Java HashMap, one is the Flink MapState, which part of the JVM heap,
> as mentioned in above figure, each of them is put?
>
> Are both Java HashMap and Flink MapState are put in Free or Memory
> Manager?
>
>
> /Question 2/ - As it is stated in the doc, TaskManager allocates the
> memory on start-up time (lets say 1024MB). Does that mean, if I check
> the utilization of the memory on the machine (i.e., via top), these
> allocated memory is in used? It does not matter whether I have jobs
> running on the TaskManager?
>
> Lets say, if I start or stop a simple job, the memory usage will the
> same (if there is not off-heap memory used or there is no OOM)?
>
>
> Many Thanks.
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>


signature.asc
Description: OpenPGP digital signature


Flink on Kubernetes (Minikube)

2018-12-19 Thread Alexandru Gutan
Dear all,

I followed the instructions found here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html
Minikube version 0.31-01
Kubernetes version 1.10
Flink Docker image: flink:latest (1.7.0-scala_2.12)

I ran the following commands:

minikube start
minikube ssh 'sudo ip link set docker0 promisc on'
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
kubectl create -f jobmanager-service.yaml

The 2 taskmanagers fail.
Output:

Starting Task Manager
config file:
jobmanager.rpc.address: flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
rest.port: 8081
blob.server.port: 6124
query.server.port: 6125
Starting taskexecutor as a console application on host
flink-taskmanager-7679c9d55d-n2trk.
2018-12-19 11:42:45,216 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -

2018-12-19 11:42:45,218 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Starting
TaskManager (Version: 1.7.0, Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
2018-12-19 11:42:45,218 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS current
user: flink
2018-12-19 11:42:45,219 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  *Current
Hadoop/Kerberos user: *
2018-12-19 11:42:45,219 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM:
OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
2018-12-19 11:42:45,219 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Maximum
heap size: 922 MiBytes
2018-12-19 11:42:45,220 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JAVA_HOME:
/docker-java-home/jre
2018-12-19 11:42:45,220 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  No Hadoop
Dependency available
2018-12-19 11:42:45,221 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM
Options:
2018-12-19 11:42:45,221 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
-XX:+UseG1GC
2018-12-19 11:42:45,221 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xms922M
2018-12-19 11:42:45,221 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xmx922M
2018-12-19 11:42:45,221 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
-XX:MaxDirectMemorySize=8388607T
2018-12-19 11:42:45,223 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-12-19 11:42:45,223 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-12-19 11:42:45,223 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Program
Arguments:
2018-12-19 11:42:45,223 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
--configDir
2018-12-19 11:42:45,224 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
/opt/flink/conf
2018-12-19 11:42:45,224 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath:
/opt/flink/lib/flink-python_2.12-1.7.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.7.0.jar:::
2018-12-19 11:42:45,224 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -

2018-12-19 11:42:45,228 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered
UNIX signal handlers for [TERM, HUP, INT]
2018-12-19 11:42:45,233 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum
number of open file descriptors is 1048576.
2018-12-19 11:42:45,249 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-12-19 11:42:45,250 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2018-12-19 11:42:45,251 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2018-12-19 11:42:45,251 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.size, 1024m
2018-12-19 11:42:45,251 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 2
2018-12-19 11:42:45,252 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2018-12-19 11:42:45,252 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: re

Re: Connection time out - DataStream API Tutorial

2018-12-19 Thread Andrey Zagrebin
Hi Brett,

I have just tried it locally and it works for me.
It looks like some networking issue with the JVM or environment setup where 
Flink is started.

Best,
Andrey

> On 18 Dec 2018, at 05:28, Brett Marcott  wrote:
> 
> Hi Vino,
> 
> I am running everything (and generating the exception) locally I believe, as 
> the tutorial instructs.
> 
> Thanks,
> Brett
> 
> On Mon, Dec 17, 2018 at 6:12 PM vino yang  > wrote:
> Hi Brett,
> 
> Is your exception generated on your local machine or generated on a remote 
> node?
> 
> Best,
> Vino
> 
> Brett Marcott mailto:brett.marc...@gmail.com>> 
> 于2018年12月15日周六 下午5:38写道:
> Hi Flink users,
> 
> I am attempting to follow the tutorial here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/datastream_api.html
>  
> 
> 
> However I am hitting the below error:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
>   at wikiedits.WikipediaAnalysis.main(WikipediaAnalysis.java:42)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:564)
>   at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>   at java.base/java.lang.Thread.run(Thread.java:844)
> Caused by: java.net.ConnectException: Operation timed out (Connection timed 
> out)
>   at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
>   at 
> java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:400)
>   at 
> java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:243)
>   at 
> java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:225)
>   at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:402)
>   at java.base/java.net.Socket.connect(Socket.java:591)
>   at java.base/java.net.Socket.connect(Socket.java:540)
>   at java.base/java.net.Socket.(Socket.java:436)
>   at java.base/java.net.Socket.(Socket.java:213)
>   at org.schwering.irc.lib.IRCConnection.connect(IRCConnection.java:295)
>   at 
> org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEventIrcStream.connect(WikipediaEditEventIrcStream.java:62)
>   at 
> org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource.run(WikipediaEditsSource.java:80)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   ... 1 more
> 
> 
> Below connection works fine on command line:
> $ nc -z irc.wikimedia.org  6667
> Connection to irc.wikimedia.org  port 6667 [tcp/*] 
> succeeded!
> 
> Any tips?
> 
> Thanks,
> 
> Brett



Re: Flink on Kubernetes (Minikube)

2018-12-19 Thread Dawid Wysakowicz
Hi,

You used a hadoopless docker image, therefore it cannot find hadoop
dependencies. It is ok if you don't need to use any, the bolded messages
are just INFO, those are not errors.

Best,

Dawid

On 19/12/2018 12:58, Alexandru Gutan wrote:
> Dear all,
>
> I followed the instructions found here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html
> Minikube version 0.31-01
> Kubernetes version 1.10
> Flink Docker image: flink:latest (1.7.0-scala_2.12)
>
> I ran the following commands:
>
> minikube start
> minikube ssh 'sudo ip link set docker0 promisc on'
> kubectl create -f jobmanager-deployment.yaml
> kubectl create -f taskmanager-deployment.yaml
> kubectl create -f jobmanager-service.yaml
>
> The 2 taskmanagers fail.
> Output:
>
> Starting Task Manager
> config file:
> jobmanager.rpc.address: flink-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 2
> parallelism.default: 1
> rest.port: 8081
> blob.server.port: 6124
> query.server.port: 6125
> Starting taskexecutor as a console application on host
> flink-taskmanager-7679c9d55d-n2trk.
> 2018-12-19 11:42:45,216 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> 
> 2018-12-19 11:42:45,218 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Starting TaskManager (Version: 1.7.0, Rev:49da9f9, Date:28.11.2018 @
> 17:59:06 UTC)
> 2018-12-19 11:42:45,218 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS
> current user: flink
> 2018-12-19 11:42:45,219 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> *Current Hadoop/Kerberos user: *
> 2018-12-19 11:42:45,219 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM:
> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
> 2018-12-19 11:42:45,219 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Maximum heap size: 922 MiBytes
> 2018-12-19 11:42:45,220 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> JAVA_HOME: /docker-java-home/jre
> 2018-12-19 11:42:45,220 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  No
> Hadoop Dependency available
> 2018-12-19 11:42:45,221 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM
> Options:
> 2018-12-19 11:42:45,221 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -XX:+UseG1GC
> 2018-12-19 11:42:45,221 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Xms922M
> 2018-12-19 11:42:45,221 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Xmx922M
> 2018-12-19 11:42:45,221 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -XX:MaxDirectMemorySize=8388607T
> 2018-12-19 11:42:45,223 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> 2018-12-19 11:42:45,223 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> 2018-12-19 11:42:45,223 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Program Arguments:
> 2018-12-19 11:42:45,223 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> --configDir
> 2018-12-19 11:42:45,224 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> /opt/flink/conf
> 2018-12-19 11:42:45,224 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Classpath:
> /opt/flink/lib/flink-python_2.12-1.7.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.7.0.jar:::
> 2018-12-19 11:42:45,224 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> 
> 2018-12-19 11:42:45,228 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> Registered UNIX signal handlers for [TERM, HUP, INT]
> 2018-12-19 11:42:45,233 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> Maximum number of open file descriptors is 1048576.
> 2018-12-19 11:42:45,249 INFO 
> org.apache.flink.configuration.GlobalConfiguration    -
> Loading configuration property: jobmanager.rpc.address, flink-jobmanager
> 2018-12-19 11:42:45,250 INFO 
> org.apache.flink.configuration.GlobalConfiguration    -
> Loading configuration property: jobmanager.rpc.port, 6123
> 2018-12-19 11:42:45,251 INFO 
> org.apache.flink.configuration.GlobalConfiguration    -
> Loading configuration property: jobmanager.heap.size, 1024m
> 2018-12-19 11:42:45,251 INFO 
> org.apache.flink.configuration.GlobalConfiguratio

Re: Custom Metrics in Windowed AggregateFunction

2018-12-19 Thread Chirag Dewan
 That was my first alternative actually :)
That works well for per window metrics. And though it fits perfectly for 
smaller windows, it might not be frequent enough for larger window sizes. 
Thanks,
Chirag 
   On Wednesday, 19 December, 2018, 4:15:41 PM IST, Dawid Wysakowicz 
 wrote: 
 
  
Hi Chirag,
 
I am afraid you are right you cannot access metrics from within 
AggregateFunction in WindowedStream. You can though use rich variant of 
WindowFunction, which is invoked for every window with the results of 
AggregateFunction. Would that be enough for your use case to use 
.aggregate(aggregateFunction, windowFunction) and register metrics in the 
windowFunction?
 
Best,
 
Dawid
 
 On 19/12/2018 04:30, Chirag Dewan wrote:
  
 
Hi, 
  I am writing a Flink job for aggregating events in a window.  
  I am trying to use the AggregateFunction implementation for this.  
  Now, since WindowedStream does not allow a RichAggregateFunction for 
aggregation, I cant use the RuntimeContext to get the Metric group.  
  I dont even see any other way of accessing the Metric Group in a non-rich 
function implementation? 
  Is there any way around here?  
  Any help appreciated. 
  Thanks, 
  Chirag   

Re: question on Flink memory management

2018-12-19 Thread Chang Liu
Hi Dawid,

Thanks for your reply.

Then, we can just ignore the link and just focus on the questions.

Best regards/祝好,

Chang Liu 刘畅


> On 19 Dec 2018, at 12:18, Dawid Wysakowicz  wrote:
> 
> Hi Chang,
> 
> The link you've posted describes only the Batch execution. It does not 
> describe the streaming (which I think is what you use, as only there you have 
> access to flink's state). It is also quite old and therefore I am not 100% 
> sure how up-to-date it is.
> 
> Best,
> 
> Dawid
> 
> On 19/12/2018 12:11, Chang Liu wrote:
>> Dear All,
>> 
>> I am trying to figure how Flink is managing the memory. I found this link:
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 
>> 
>> 
>> 
>> 
>> 
>> Question 1 - If, in my code, I defined two objects, one is native Java 
>> HashMap, one is the Flink MapState, which part of the JVM heap, as mentioned 
>> in above figure, each of them is put?
>> 
>> Are both Java HashMap and Flink MapState are put in Free or Memory Manager?
>> 
>> 
>> Question 2 - As it is stated in the doc, TaskManager allocates the memory on 
>> start-up time (lets say 1024MB). Does that mean, if I check the utilization 
>> of the memory on the machine (i.e., via top), these allocated memory is in 
>> used? It does not matter whether I have jobs running on the TaskManager?
>> 
>> Lets say, if I start or stop a simple job, the memory usage will the same 
>> (if there is not off-heap memory used or there is no OOM)?
>> 
>> 
>> Many Thanks.
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 



Re: question on Flink memory management

2018-12-19 Thread Chang Liu
I found this answer https://stackoverflow.com/a/50816868/4802797 
 from Till.

And he mentioned there that "...streaming jobs currently don't use managed 
memory…".

So that means, the memory used by flink streaming is really based on the jobs 
running there. If I cancel the job, the memory utilization should be dropped, 
right?

Best regards/祝好,

Chang Liu 刘畅


> On 19 Dec 2018, at 13:50, Chang Liu  wrote:
> 
> Hi Dawid,
> 
> Thanks for your reply.
> 
> Then, we can just ignore the link and just focus on the questions.
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 19 Dec 2018, at 12:18, Dawid Wysakowicz > > wrote:
>> 
>> Hi Chang,
>> 
>> The link you've posted describes only the Batch execution. It does not 
>> describe the streaming (which I think is what you use, as only there you 
>> have access to flink's state). It is also quite old and therefore I am not 
>> 100% sure how up-to-date it is.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 19/12/2018 12:11, Chang Liu wrote:
>>> Dear All,
>>> 
>>> I am trying to figure how Flink is managing the memory. I found this link:
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Question 1 - If, in my code, I defined two objects, one is native Java 
>>> HashMap, one is the Flink MapState, which part of the JVM heap, as 
>>> mentioned in above figure, each of them is put?
>>> 
>>> Are both Java HashMap and Flink MapState are put in Free or Memory Manager?
>>> 
>>> 
>>> Question 2 - As it is stated in the doc, TaskManager allocates the memory 
>>> on start-up time (lets say 1024MB). Does that mean, if I check the 
>>> utilization of the memory on the machine (i.e., via top), these allocated 
>>> memory is in used? It does not matter whether I have jobs running on the 
>>> TaskManager?
>>> 
>>> Lets say, if I start or stop a simple job, the memory usage will the same 
>>> (if there is not off-heap memory used or there is no OOM)?
>>> 
>>> 
>>> Many Thanks.
>>> 
>>> Best regards/祝好,
>>> 
>>> Chang Liu 刘畅
>>> 
>>> 
> 



Re: Flink on Kubernetes (Minikube)

2018-12-19 Thread Alexandru Gutan
Thanks!
I'm using now the *flink:1.7.0-hadoop24-scala_2.12* image.
The Hadoop related error is gone, but I have a new error:

Starting Task Manager
config file:
jobmanager.rpc.address: flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
rest.port: 8081
blob.server.port: 6124
query.server.port: 6125
Starting taskexecutor as a console application on host
flink-taskmanager-54b679f8bb-22b4r.
2018-12-19 13:09:38,469 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -

2018-12-19 13:09:38,470 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Starting
TaskManager (Version: 1.7.0, Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
2018-12-19 13:09:38,470 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS current
user: flink
2018-12-19 13:09:38,921 WARN
org.apache.hadoop.util.NativeCodeLoader   - Unable to
load native-hadoop library for your platform... using builtin-java classes
where applicable
2018-12-19 13:09:39,307 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Current
Hadoop/Kerberos user: flink
2018-12-19 13:09:39,307 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM:
OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
2018-12-19 13:09:39,307 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Maximum
heap size: 922 MiBytes
2018-12-19 13:09:39,307 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JAVA_HOME:
/docker-java-home/jre
2018-12-19 13:09:39,318 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Hadoop
version: 2.4.1
2018-12-19 13:09:39,318 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM
Options:
2018-12-19 13:09:39,319 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
-XX:+UseG1GC
2018-12-19 13:09:39,319 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xms922M
2018-12-19 13:09:39,320 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xmx922M
2018-12-19 13:09:39,320 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
-XX:MaxDirectMemorySize=8388607T
2018-12-19 13:09:39,320 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-12-19 13:09:39,320 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-12-19 13:09:39,320 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Program
Arguments:
2018-12-19 13:09:39,321 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
--configDir
2018-12-19 13:09:39,321 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
/opt/flink/conf
2018-12-19 13:09:39,321 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath:
/opt/flink/lib/flink-python_2.12-1.7.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.7.0.jar:::
2018-12-19 13:09:39,321 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -

2018-12-19 13:09:39,323 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered
UNIX signal handlers for [TERM, HUP, INT]
2018-12-19 13:09:39,329 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum
number of open file descriptors is 1048576.
2018-12-19 13:09:39,366 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, flink-jobmanager
2018-12-19 13:09:39,367 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2018-12-19 13:09:39,367 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2018-12-19 13:09:39,367 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.size, 1024m
2018-12-19 13:09:39,368 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 2
2018-12-19 13:09:39,369 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2018-12-19 13:09:39,370 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: rest.port, 8081
2018-12-19 13:09:39,372 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: blob

Re: Flink on Kubernetes (Minikube)

2018-12-19 Thread Dawid Wysakowicz
Hi Alexandru,

This sounds reasonable that it might be because of this minikube command
failed, but I am not a kubernetes expert. I cc Till who knows more on this.

Best,

Dawid

On 19/12/2018 14:16, Alexandru Gutan wrote:
> Thanks!
> I'm using now the *flink:1.7.0-hadoop24-scala_2.12* image.
> The Hadoop related error is gone, but I have a new error:
>
> Starting Task Manager
> config file:
> jobmanager.rpc.address: flink-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 2
> parallelism.default: 1
> rest.port: 8081
> blob.server.port: 6124
> query.server.port: 6125
> Starting taskexecutor as a console application on host
> flink-taskmanager-54b679f8bb-22b4r.
> 2018-12-19 13:09:38,469 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> 
> 2018-12-19 13:09:38,470 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Starting TaskManager (Version: 1.7.0, Rev:49da9f9, Date:28.11.2018 @
> 17:59:06 UTC)
> 2018-12-19 13:09:38,470 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS
> current user: flink
> 2018-12-19 13:09:38,921 WARN 
> org.apache.hadoop.util.NativeCodeLoader   - Unable
> to load native-hadoop library for your platform... using builtin-java
> classes where applicable
> 2018-12-19 13:09:39,307 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Current Hadoop/Kerberos user: flink
> 2018-12-19 13:09:39,307 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM:
> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
> 2018-12-19 13:09:39,307 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Maximum heap size: 922 MiBytes
> 2018-12-19 13:09:39,307 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> JAVA_HOME: /docker-java-home/jre
> 2018-12-19 13:09:39,318 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Hadoop version: 2.4.1
> 2018-12-19 13:09:39,318 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM
> Options:
> 2018-12-19 13:09:39,319 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -XX:+UseG1GC
> 2018-12-19 13:09:39,319 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Xms922M
> 2018-12-19 13:09:39,320 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Xmx922M
> 2018-12-19 13:09:39,320 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -XX:MaxDirectMemorySize=8388607T
> 2018-12-19 13:09:39,320 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> 2018-12-19 13:09:39,320 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> 2018-12-19 13:09:39,320 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Program Arguments:
> 2018-12-19 13:09:39,321 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> --configDir
> 2018-12-19 13:09:39,321 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> /opt/flink/conf
> 2018-12-19 13:09:39,321 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - 
> Classpath:
> /opt/flink/lib/flink-python_2.12-1.7.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.7.0.jar:::
> 2018-12-19 13:09:39,321 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> 
> 2018-12-19 13:09:39,323 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> Registered UNIX signal handlers for [TERM, HUP, INT]
> 2018-12-19 13:09:39,329 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> Maximum number of open file descriptors is 1048576.
> 2018-12-19 13:09:39,366 INFO 
> org.apache.flink.configuration.GlobalConfiguration    -
> Loading configuration property: jobmanager.rpc.address, flink-jobmanager
> 2018-12-19 13:09:39,367 INFO 
> org.apache.flink.configuration.GlobalConfiguration    -
> Loading configuration property: jobmanager.rpc.port, 6123
> 2018-12-19 13:09:39,367 INFO 
> org.apache.flink.configuration.GlobalConfiguration    -
> Loading configuration property: jobmanager.heap.size, 1024m
> 2018-12-19 13:09:39,367 INFO 
> org.apache.flink.configuration.GlobalConfiguration    -
> Loading configuration property: taskmanager.heap.size, 1024m
> 2018-12-19 13:09:39,368 INFO 
> org.apache.flink.configuration.GlobalConfiguration    -
> L

Re: Flink on Kubernetes (Minikube)

2018-12-19 Thread Alexandru Gutan
I've found this in the archives:
http://mail-archives.apache.org/mod_mbox/flink-dev/201804.mbox/%3CCALbFKXr=rp9TYpD_JA8vmuWbcjY0+Lp2mbr4Y=0fnh316hz...@mail.gmail.com%3E

And as suggested I tried a different startup order but unsuccessful:

kubectl create -f jobmanager-deployment.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f taskmanager-deployment.yaml

I get the same error *java.net.UnknownHostException: flink-jobmanager:
Temporary failure in name resolution*


On Wed, 19 Dec 2018 at 13:27, Dawid Wysakowicz 
wrote:

> Hi Alexandru,
>
> This sounds reasonable that it might be because of this minikube command
> failed, but I am not a kubernetes expert. I cc Till who knows more on this.
>
> Best,
>
> Dawid
> On 19/12/2018 14:16, Alexandru Gutan wrote:
>
> Thanks!
> I'm using now the *flink:1.7.0-hadoop24-scala_2.12* image.
> The Hadoop related error is gone, but I have a new error:
>
> Starting Task Manager
> config file:
> jobmanager.rpc.address: flink-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 1024m
> taskmanager.heap.size: 1024m
> taskmanager.numberOfTaskSlots: 2
> parallelism.default: 1
> rest.port: 8081
> blob.server.port: 6124
> query.server.port: 6125
> Starting taskexecutor as a console application on host
> flink-taskmanager-54b679f8bb-22b4r.
> 2018-12-19 13:09:38,469 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> 
> 2018-12-19 13:09:38,470 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Starting
> TaskManager (Version: 1.7.0, Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
> 2018-12-19 13:09:38,470 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS current
> user: flink
> 2018-12-19 13:09:38,921 WARN
> org.apache.hadoop.util.NativeCodeLoader   - Unable to
> load native-hadoop library for your platform... using builtin-java classes
> where applicable
> 2018-12-19 13:09:39,307 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Current
> Hadoop/Kerberos user: flink
> 2018-12-19 13:09:39,307 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM:
> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
> 2018-12-19 13:09:39,307 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Maximum
> heap size: 922 MiBytes
> 2018-12-19 13:09:39,307 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JAVA_HOME:
> /docker-java-home/jre
> 2018-12-19 13:09:39,318 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Hadoop
> version: 2.4.1
> 2018-12-19 13:09:39,318 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM
> Options:
> 2018-12-19 13:09:39,319 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -XX:+UseG1GC
> 2018-12-19 13:09:39,319 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xms922M
> 2018-12-19 13:09:39,320 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xmx922M
> 2018-12-19 13:09:39,320 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -XX:MaxDirectMemorySize=8388607T
> 2018-12-19 13:09:39,320 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> 2018-12-19 13:09:39,320 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> 2018-12-19 13:09:39,320 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Program
> Arguments:
> 2018-12-19 13:09:39,321 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> --configDir
> 2018-12-19 13:09:39,321 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> /opt/flink/conf
> 2018-12-19 13:09:39,321 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath:
> /opt/flink/lib/flink-python_2.12-1.7.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.7.0.jar:::
> 2018-12-19 13:09:39,321 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> 
> 2018-12-19 13:09:39,323 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered
> UNIX signal handlers for [TERM, HUP, INT]
> 2018-12-19 13:09:39,329 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum
> number of open file descriptors is 1048576.
> 2018-12-19 13:09:39,366 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, flink-jobmanager
> 2018-12-19 13:09:39,367 INFO
> org.apache.flink.configuration.GlobalConfigura

getting an error when configuring state backend to hdfs

2018-12-19 Thread Avi Levi
Hi,
I am trying to set the backend state to hdfs
*val stateUri = "hdfs/path_to_dir"*
*val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)*

*env.setStateBackend(backend)*

I am running with flink 1.7.0 with the following dependencies (tried them
with different combinations)  :
*"org.apache.flink"%% "flink-connector-filesystem" % flinkV*
*"org.apache.flink"% "flink-hadoop-fs" % flinkV*
*"org.apache.hadoop"   % "hadoop-hdfs" %
hadoopVersion*


*"org.apache.hadoop"   % "hadoop-common"   %
hadoopVersion*
*however when running the jar I am getting this error:*

*Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'hdfs'. The scheme
is not directly supported by Flink and no Hadoop file system to support
this scheme could be loaded.*
* at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)*
* at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)*
* at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)*
* at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)*
* at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)*
* at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)*
* ... 17 more*
*Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies.*
* at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)*
* at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)*
* ... 23 more*

any help will be greatly appreciated


Question about Flink optimizer on Stream API

2018-12-19 Thread Felipe Gutierrez
Hi,

I was reading some FLIP documents related to the new design of the Flink
Schedule [1] and unification of batch and stream [2]. Then I created two
different programs to learn how Flink optimizes the Query Plan in Batch and
in Stream mode (and how much further it goes). One using batch [3] and one
using Stream [4]. During the code debugging and also as it is depicted on
the document [2], the batch program uses the
org.apache.flink.optimizer.Optimizer class which generates a
"org.apache.flink.optimizer.plan.OptimizedPlan" while stream program uses
the "org.apache.flink.streaming.api.graph.StreamGraph" and every
transformation inside the packet
"org.apache.flink.streaming.api.transformations".

When I am showing the execution plan with "env.getExecutionPlan()" I see
exactly I have written on the Flink program (which it is expected).
However, I was looking for where I can see the optimized plan. I mean
decisions of operators reordering based on cost or statistics. For batch I
could find the "org.apache.flink.optimizer.costs.CostEstimator" and
"org.apache.flink.optimizer.DataStatistics". But for Stream I only found
the creation of the plan. How can I debug that? Or have a better
understanding of what Flink is doing. Do you advise me to read some other
reference about this?

Kind Regards,
Felipe

[1] Group-aware scheduling for Flink -
https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk
[2] Unified Core API for Streaming and Batch -
https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit#
[3]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/batch/MatrixMultiplication.java
[4]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/SensorsReadingMqttJoinQEP.java

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: getting an error when configuring state backend to hdfs

2018-12-19 Thread Chesnay Schepler
Are you including the filesystems in your jar? Filesystem jars must be 
placed in the /lib directory of the flink distribution.


On 19.12.2018 15:03, Avi Levi wrote:

Hi,
I am trying to set the backend state to hdfs
/val stateUri = "hdfs/path_to_dir"/
/val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, 
true)/

/env.setStateBackend(backend)
/
/
/
I am running with flink 1.7.0 with the following dependencies (tried 
them with different combinations)  :

/"org.apache.flink"   %% "flink-connector-filesystem" % flinkV/
/"org.apache.flink"   % "flink-hadoop-fs" % flinkV/
/"org.apache.hadoop"  % "hadoop-hdfs" % 
hadoopVersion/
/"org.apache.hadoop"  % "hadoop-common"   % 
hadoopVersion


/
*however when running the jar I am getting this error:*
*
*
/Caused by: 
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'hdfs'. The scheme is 
not directly supported by Flink and no Hadoop file system to support 
this scheme could be loaded./
/at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)/

/at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)/
/at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)/
/at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)/
/at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)/
/at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)/
/at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)/

/... 17 more/
/Caused by: 
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop 
is not in the classpath/dependencies./
/at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)/
/at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)/

/... 23 more/

any help will be greatly appreciated





Re: Flink on Kubernetes (Minikube)

2018-12-19 Thread Alexandru Gutan
Got it working on the Google Cloud Platform Kubernetes service...
More support for Minikube is needed.

On Wed, 19 Dec 2018 at 13:44, Alexandru Gutan 
wrote:

> I've found this in the archives:
> http://mail-archives.apache.org/mod_mbox/flink-dev/201804.mbox/%3CCALbFKXr=rp9TYpD_JA8vmuWbcjY0+Lp2mbr4Y=0fnh316hz...@mail.gmail.com%3E
>
> And as suggested I tried a different startup order but unsuccessful:
>
> kubectl create -f jobmanager-deployment.yaml
> kubectl create -f jobmanager-service.yaml
> kubectl create -f taskmanager-deployment.yaml
>
> I get the same error *java.net.UnknownHostException: flink-jobmanager: 
> Temporary failure in name resolution*
>
>
> On Wed, 19 Dec 2018 at 13:27, Dawid Wysakowicz 
> wrote:
>
>> Hi Alexandru,
>>
>> This sounds reasonable that it might be because of this minikube command
>> failed, but I am not a kubernetes expert. I cc Till who knows more on this.
>>
>> Best,
>>
>> Dawid
>> On 19/12/2018 14:16, Alexandru Gutan wrote:
>>
>> Thanks!
>> I'm using now the *flink:1.7.0-hadoop24-scala_2.12* image.
>> The Hadoop related error is gone, but I have a new error:
>>
>> Starting Task Manager
>> config file:
>> jobmanager.rpc.address: flink-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.heap.size: 1024m
>> taskmanager.heap.size: 1024m
>> taskmanager.numberOfTaskSlots: 2
>> parallelism.default: 1
>> rest.port: 8081
>> blob.server.port: 6124
>> query.server.port: 6125
>> Starting taskexecutor as a console application on host
>> flink-taskmanager-54b679f8bb-22b4r.
>> 2018-12-19 13:09:38,469 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>> 
>> 2018-12-19 13:09:38,470 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Starting
>> TaskManager (Version: 1.7.0, Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>> 2018-12-19 13:09:38,470 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS current
>> user: flink
>> 2018-12-19 13:09:38,921 WARN
>> org.apache.hadoop.util.NativeCodeLoader   - Unable to
>> load native-hadoop library for your platform... using builtin-java classes
>> where applicable
>> 2018-12-19 13:09:39,307 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Current
>> Hadoop/Kerberos user: flink
>> 2018-12-19 13:09:39,307 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM:
>> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
>> 2018-12-19 13:09:39,307 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Maximum
>> heap size: 922 MiBytes
>> 2018-12-19 13:09:39,307 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JAVA_HOME:
>> /docker-java-home/jre
>> 2018-12-19 13:09:39,318 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Hadoop
>> version: 2.4.1
>> 2018-12-19 13:09:39,318 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM
>> Options:
>> 2018-12-19 13:09:39,319 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>> -XX:+UseG1GC
>> 2018-12-19 13:09:39,319 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xms922M
>> 2018-12-19 13:09:39,320 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xmx922M
>> 2018-12-19 13:09:39,320 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>> -XX:MaxDirectMemorySize=8388607T
>> 2018-12-19 13:09:39,320 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>> 2018-12-19 13:09:39,320 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>> 2018-12-19 13:09:39,320 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Program
>> Arguments:
>> 2018-12-19 13:09:39,321 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>> --configDir
>> 2018-12-19 13:09:39,321 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>> /opt/flink/conf
>> 2018-12-19 13:09:39,321 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath:
>> /opt/flink/lib/flink-python_2.12-1.7.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.12-1.7.0.jar:::
>> 2018-12-19 13:09:39,321 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>> 
>> 2018-12-19 13:09:39,323 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered
>> UNIX signal handlers for [TERM, HUP, INT]
>> 2018-12-19 13:09:39,329 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum
>> number of open

Can't list logs or stdout through web console on Flink 1.7 Kubernetes

2018-12-19 Thread William Saar


I'm running Flink 1.7 in ECS, is this a known issue or should I create
a jira?

The web console doesn't show anything when trying to list logs or
stdout for task managers and the job manager log have stack traces for
the errors

2018-12-19 15:35:53,498 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
- Failed to transfer file from TaskExecutor
d7fd266047d5acfaddeb1156bdb23ff3.
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkException: The file STDOUT is not available
on the TaskExecutor.

2018-12-19 15:36:02,538 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler 
- Failed to transfer file from TaskExecutor
d7fd266047d5acfaddeb1156bdb23ff3.
java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkException: The file LOG is not available on
the TaskExecutor.




Re: getting an error when configuring state backend to hdfs

2018-12-19 Thread Avi Levi
Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt
assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler  wrote:

> Are you including the filesystems in your jar? Filesystem jars must be
> placed in the /lib directory of the flink distribution.
>
> On 19.12.2018 15:03, Avi Levi wrote:
>
> Hi,
> I am trying to set the backend state to hdfs
> *val stateUri = "hdfs/path_to_dir"*
> *val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri,
> true)*
>
> *env.setStateBackend(backend) *
>
> I am running with flink 1.7.0 with the following dependencies (tried them
> with different combinations)  :
> *"org.apache.flink"%% "flink-connector-filesystem" % flinkV*
> *"org.apache.flink"% "flink-hadoop-fs" % flinkV*
> *"org.apache.hadoop"   % "hadoop-hdfs" %
> hadoopVersion*
>
>
> *"org.apache.hadoop"   % "hadoop-common"   %
> hadoopVersion *
> *however when running the jar I am getting this error:*
>
> *Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded.*
> * at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)*
> * at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)*
> * at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)*
> * at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)*
> * at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)*
> * at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)*
> * ... 17 more*
> *Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.*
> * at
> org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)*
> * at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)*
> * ... 23 more*
>
> any help will be greatly appreciated
>
>
>


Re: getting an error when configuring state backend to hdfs

2018-12-19 Thread Chesnay Schepler

flink-hadoop-fs should be in /lib

On 19.12.2018 16:44, Avi Levi wrote:

Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using 
sbt assembly). which jar I should place in the /lib directory ?


On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler > wrote:


Are you including the filesystems in your jar? Filesystem jars
must be placed in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:

Hi,
I am trying to set the backend state to hdfs
/val stateUri = "hdfs/path_to_dir"/
/val backend: RocksDBStateBackend = new
RocksDBStateBackend(stateUri, true)/
/env.setStateBackend(backend)
/
/
/
I am running with flink 1.7.0 with the following dependencies
(tried them with different combinations)  :
/"org.apache.flink"   %% "flink-connector-filesystem"% flinkV/
/"org.apache.flink"   % "flink-hadoop-fs"% flinkV/
/"org.apache.hadoop"  % "hadoop-hdfs"% hadoopVersion/
/"org.apache.hadoop"  % "hadoop-common"  % hadoopVersion

/
*however when running the jar I am getting this error:*
*
*
/Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'hdfs'.
The scheme is not directly supported by Flink and no Hadoop file
system to support this scheme could be loaded./
/at

org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)/
/at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)/
/at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)/
/at

org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)/
/at

org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)/
/at

org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)/
/at

org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)/
/... 17 more/
/Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies./
/at

org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)/
/at

org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)/
/... 23 more/

any help will be greatly appreciated







Re: Flink on Kubernetes (Minikube)

2018-12-19 Thread Till Rohrmann
Hi Alexandru,

minikube ssh 'sudo ip link set docker0 promisc on' is not supposed to solve
the problem you are seeing. It only resolves the problem if the JobMaster
wants to reach itself through the jobmanager-service name. Your problem
seems to be something else. Could you check if jobmanager-service resolves
on a pod by sshing into it and pinging this address?

Cheers,
Till

On Wed, Dec 19, 2018 at 4:08 PM Alexandru Gutan 
wrote:

> Got it working on the Google Cloud Platform Kubernetes service...
> More support for Minikube is needed.
>
> On Wed, 19 Dec 2018 at 13:44, Alexandru Gutan 
> wrote:
>
>> I've found this in the archives:
>> http://mail-archives.apache.org/mod_mbox/flink-dev/201804.mbox/%3CCALbFKXr=rp9TYpD_JA8vmuWbcjY0+Lp2mbr4Y=0fnh316hz...@mail.gmail.com%3E
>>
>> And as suggested I tried a different startup order but unsuccessful:
>>
>> kubectl create -f jobmanager-deployment.yaml
>> kubectl create -f jobmanager-service.yaml
>> kubectl create -f taskmanager-deployment.yaml
>>
>> I get the same error *java.net.UnknownHostException: flink-jobmanager: 
>> Temporary failure in name resolution*
>>
>>
>> On Wed, 19 Dec 2018 at 13:27, Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Alexandru,
>>>
>>> This sounds reasonable that it might be because of this minikube command
>>> failed, but I am not a kubernetes expert. I cc Till who knows more on this.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 19/12/2018 14:16, Alexandru Gutan wrote:
>>>
>>> Thanks!
>>> I'm using now the *flink:1.7.0-hadoop24-scala_2.12* image.
>>> The Hadoop related error is gone, but I have a new error:
>>>
>>> Starting Task Manager
>>> config file:
>>> jobmanager.rpc.address: flink-jobmanager
>>> jobmanager.rpc.port: 6123
>>> jobmanager.heap.size: 1024m
>>> taskmanager.heap.size: 1024m
>>> taskmanager.numberOfTaskSlots: 2
>>> parallelism.default: 1
>>> rest.port: 8081
>>> blob.server.port: 6124
>>> query.server.port: 6125
>>> Starting taskexecutor as a console application on host
>>> flink-taskmanager-54b679f8bb-22b4r.
>>> 2018-12-19 13:09:38,469 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> 
>>> 2018-12-19 13:09:38,470 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Starting
>>> TaskManager (Version: 1.7.0, Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>>> 2018-12-19 13:09:38,470 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  OS current
>>> user: flink
>>> 2018-12-19 13:09:38,921 WARN
>>> org.apache.hadoop.util.NativeCodeLoader   - Unable to
>>> load native-hadoop library for your platform... using builtin-java classes
>>> where applicable
>>> 2018-12-19 13:09:39,307 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Current
>>> Hadoop/Kerberos user: flink
>>> 2018-12-19 13:09:39,307 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM:
>>> OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
>>> 2018-12-19 13:09:39,307 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Maximum
>>> heap size: 922 MiBytes
>>> 2018-12-19 13:09:39,307 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JAVA_HOME:
>>> /docker-java-home/jre
>>> 2018-12-19 13:09:39,318 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Hadoop
>>> version: 2.4.1
>>> 2018-12-19 13:09:39,318 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  JVM
>>> Options:
>>> 2018-12-19 13:09:39,319 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> -XX:+UseG1GC
>>> 2018-12-19 13:09:39,319 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xms922M
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - -Xmx922M
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> -XX:MaxDirectMemorySize=8388607T
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>> 2018-12-19 13:09:39,320 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Program
>>> Arguments:
>>> 2018-12-19 13:09:39,321 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> --configDir
>>> 2018-12-19 13:09:39,321 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
>>> /opt/flink/conf
>>> 2018-12-19 13:09:39,321 INFO
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -  Classpath:
>>> /opt/flink/lib/flink-python_2.12-1.7.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.7.0.jar:/opt/flink/lib/log4j-1.2.17.j

Re: How to migrate Kafka Producer ?

2018-12-19 Thread Edward Alexander Rojas Clavijo
Hi Dawid, Piotr,

I see that for the kafka consumer base there are some migration tests here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java

As the kafka consumer state is managed on the FlinkKafkaConsumerBase class
I assumed this would cover also migration of connectors versions, but maybe
I'm missing something (?)

I performed some tests on my own and the migration of the kafka consumer
connector worked.


Regarding the kafka producer I am just updating the job with the new
connector and removing the previous one and upgrading the job by using a
savepoint and the --allowNonRestoredState.
So far my tests with this option are successful.

I appreciate any help here to clarify my understanding.

Regards,
Edward

El mié., 19 dic. 2018 a las 10:28, Dawid Wysakowicz ()
escribió:

> Hi Edward,
>
> AFAIK we do not support migrating state from one connector to another
> one, which is in fact the case for kafka 0.11 and the "universal" one.
>
> You might try to use the project bravo[1] to migrate the state manually,
> but unfortunately you have to understand the internals of both of the
> connectors. I pull also Piotr to the thread, maybe he can provide more
> straightforward workaround.
>
> Best,
>
> Dawid
>
> [1] https://github.com/king/bravo
>
> On 18/12/2018 14:33, Edward Rojas wrote:
> > Hi,
> >
> > I'm planning to migrate from kafka connector 0.11 to the new universal
> kafka
> > connector 1.0.0+ but I'm having some troubles.
> >
> > The kafka consumer seems to be compatible but when trying to migrate the
> > kafka producer I get an incompatibility error for the state migration.
> > It looks like the producer uses a list state of type
> > "NextTransactionalIdHint", but this class is specific for each Producer
> > (FlinkKafkaProducer011.NextTransactionalIdHint  vs
> > FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are
> not
> > compatible.
> >
> >
> > I would like to know what is the recommended way to perform this kind of
> > migration without losing the state ?
> >
> > Thanks in advance,
> > Edward
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Unbalanced Kafka consumer consumption

2018-12-19 Thread Gerard Garcia
We finally figure it out. We had a large value in the Kafka consumer option
'max.partition.fetch.bytes', this made the KafkaConsumer to not consume at
a balanced rate from all partitions.

Gerard


Re: Can't list logs or stdout through web console on Flink 1.7 Kubernetes

2018-12-19 Thread Steven Nelson
There is a known issue for this I believe. The problem is that the 
containerized versions of Flink output logs to STDOUT instead of files inside 
the node. If you pull use docker logs on the container you can see what you’re 
looking for. I use the Kube dashboard to view the logs centrally.

Sent from my iPhone

> On Dec 19, 2018, at 9:40 AM, William Saar  wrote:
> 
> 
> I'm running Flink 1.7 in ECS, is this a known issue or should I create a jira?
> 
> The web console doesn't show anything when trying to list logs or stdout for 
> task managers and the job manager log have stack traces for the errors
> 2018-12-19 15:35:53,498 ERROR 
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
>   - Failed to transfer file from TaskExecutor 
> d7fd266047d5acfaddeb1156bdb23ff3.
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkException: The file STDOUT is not available on the 
> TaskExecutor.
> 
> 2018-12-19 15:36:02,538 ERROR 
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler  
> - Failed to transfer file from TaskExecutor d7fd266047d5acfaddeb1156bdb23ff3.
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkException: The file LOG is not available on the 
> TaskExecutor.
> 


Re: flink operator's latency metrics continues to increase.

2018-12-19 Thread Till Rohrmann
Hi Suxing Lee,

thanks for reaching out to me. I forward this mail also to the user mailing
list because it could be interesting for others as well.

Your observation could indeed be an indicator for a problem with the
latency metric. I quickly checked the code and on the first glance it looks
right to me that we increase the nextTimestamp field by period in
RepeatedTriggerTask because we schedule this task at a fixed rate in
SystemProcessingTimeService#scheduleAtFixedRate. Internally this method
calls ScheduledThreadPoolExecutor#scheduleAtFixedRate which uses
System.nanoTime to schedule tasks repeatedly. In fact, the same logic will
be used by the ScheduledThreadPoolExecutor#ScheduledFutureTask. If a GC
pause or another stop the world event happens, this should only affect one
latency metric and not all (given that System.nanoTime continues to
increase) because the next will be scheduled faster since System.nanoTime
might have progressed more.

What could be a problem is that we compute the latency by
System.currentTimeMillis - marker.getMarkedTime. I think there is no
guarantee that System.currentTimeMillis and System.nanoTime don't drift
apart. Especially if they are executed on different machines. This is
something which we could check.

This link [1] explains the drift problem a bit more in detail.

In any case, I would suggest to open a JIRA issue to report this problem.

[1] https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6519418

Cheers,
Till

On Mon, Dec 17, 2018 at 2:37 PM Suxing Lee <913910...@qq.com> wrote:

> Hi Till Rohrmann,
>
> I was running flink 1.5.5,  and I use prometheus to collect metrics to
> check latency of my jobs.
> But sometimes I observerd that  the operator's latency metrics continues
> to increase in my job.
> The operator's latency time is increased by approximately 2.7 minutes per
> day (please see the attached screenshots)
>
> my job's logic is simple,just distribute data from kafkaSource to
> bucketingSink.
> so  I check the consumer offsets in kafka for  consumer group, I also
> check the latest data in hdfs . in fact, there is no serious latency in my
> job.
>
> I notice that the statistical method of latency is currentTimeMillis minus
> LatencyMarker's markedTime.
> but LatencyMarker's timestamp come from RepeatedTriggerTask's
> nextTimestamp which compute timestamp by plus a period(default  value is 2s
> before v1.5.5),the nextTimestamp will be delay when JVM GC or linux
> preemptive scheduling happened. as time increases,the nextTimestamp is much
> later than the current time ( I had verify this result via  the JVM Heap
> Dump).
>
> we can avoid the above situation by directly using linux's NTP to
> guarantee accuracy,not need to compute timestamp by process.
> I'm not very familiar with  SystemProcessingTimeService. Is there some
> detail I have not think about?
>
>
> Best regards and thanks for your help.
> Suxing Lee
>
>
>
>
>


Re: Unbalanced Kafka consumer consumption

2018-12-19 Thread Till Rohrmann
Great to hear and thanks for letting us know.

Cheers,
Till

On Wed, Dec 19, 2018 at 5:39 PM Gerard Garcia  wrote:

> We finally figure it out. We had a large value in the Kafka consumer
> option 'max.partition.fetch.bytes', this made the KafkaConsumer to not
> consume at a balanced rate from all partitions.
>
> Gerard
>


Re: getting an error when configuring state backend to hdfs

2018-12-19 Thread Steven Nelson
What image are you using?

Sent from my iPhone

> On Dec 19, 2018, at 9:44 AM, Avi Levi  wrote:
> 
> Hi Chesnay,
> What do you mean? I am creating a fat jar with all dependencies (using sbt 
> assembly). which jar I should place in the /lib directory ?
> 
>> On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler  wrote:
>> Are you including the filesystems in your jar? Filesystem jars must be 
>> placed in the /lib directory of the flink distribution.
>> 
>>> On 19.12.2018 15:03, Avi Levi wrote:
>>> Hi,
>>> I am trying to set the backend state to hdfs 
>>> val stateUri = "hdfs/path_to_dir"
>>> val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
>>> env.setStateBackend(backend)
>>> 
>>> I am running with flink 1.7.0 with the following dependencies (tried them 
>>> with different combinations)  :
>>> "org.apache.flink"%% "flink-connector-filesystem" % flinkV
>>> "org.apache.flink"% "flink-hadoop-fs" % flinkV
>>> "org.apache.hadoop"   % "hadoop-hdfs" % 
>>> hadoopVersion
>>> "org.apache.hadoop"   % "hadoop-common"   % 
>>> hadoopVersion
>>> 
>>> however when running the jar I am getting this error:
>>> 
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
>>> Could not find a file system implementation for scheme 'hdfs'. The scheme 
>>> is not directly supported by Flink and no Hadoop file system to support 
>>> this scheme could be loaded.
>>> at 
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
>>> at 
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:249)
>>> ... 17 more
>>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
>>> Hadoop is not in the classpath/dependencies.
>>> at 
>>> org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
>>> at 
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
>>> ... 23 more
>>> 
>>> any help will be greatly appreciated 
>>> 
>> 


Re: [SURVEY] Usage of flink-python and flink-streaming-python

2018-12-19 Thread Till Rohrmann
Thanks a lot for the feedback for this survey. I will close it now since 6
days have passed without new activity.

To me it seems that we currently don't have many users who use flink-python
or flink-streaming-python because of their limitations (mentioned in the
survey by Xianda). This information might be useful when discussing Flink's
future Python strategy and whether to continue supporting flink-python and
flink-streaming-python in the future.

Cheers,
Till

On Thu, Dec 13, 2018 at 10:50 AM Stephan Ewen  wrote:

> You are right. Let's refocus this on the python user survey and spin out
> another thread.
>
> On Thu, Dec 13, 2018 at 9:56 AM Xianda Ke  wrote:
>
> > Hi Folks,
> > To avoid polluting the survey thread with discussions, we started
> separate
> > thread and maybe we can continue the discussion over there.
> >
> > Regards,
> > Xianda
> >
> > On Wed, Dec 12, 2018 at 3:34 AM Stephan Ewen  wrote:
> >
> > > I like that we are having a general discussion about how to use Python
> > and
> > > Flink together in the future.
> > > The current python support has some shortcomings that were mentioned
> > > before, so we clearly need something better.
> > >
> > > Parts of the community have worked together with the Apache Beam
> project,
> > > which is pretty far in adding a portability layer to support Python.
> > > Before we dive deep into a design proposal for a new Python API in
> > Flink, I
> > > think we should figure out in which general direction Python support
> > should
> > > go.
> > >
> > > *Option (1): Language portability via Apache Beam*
> > >
> > > Pro:
> > >   - already exists to a large extend and already has users
> > >   - portability layer offers other languages in addition to python. Go
> is
> > > in the making, NodeJS has been speculated, etc.
> > >   - collaboration with another project / community which means more
> > > manpower and exposure. Beam currently has a strong focus on Flink as a
> > > runner for Python.
> > >   - Python API is used for existing ML libraries from the TensorFlow
> > > ecosystem
> > >
> > > Con:
> > >   - Not Flink's API. Python users need to learn the syntax of another
> API
> > > (Python API is inherently different, but even more different here).
> > >
> > > *Option (2): Implement own Python API*
> > >
> > > Pro:
> > >   - Python API will be closer to Flink Java / Scala APIs
> > >
> > > Con:
> > >   - We will only have Python.
> > >   - Need to to rebuild the Python language bridge (significant work to
> > get
> > > stable)
> > >   - might lose tight collaboration with Beam and the other parties in
> > Beam
> > >   - not benefiting from Beam's ecosystem
> > >
> > > *Option (3): **Implement own portability layer*
> > >
> > > Pro
> > >   - Flexibility to align APIs across languages within Flink ecosystem
> > >
> > > Con
> > >   - A lot of work (for context, to get this feature complete, Beam has
> > > worked on that for a year now)
> > >   - Replicating work that already exists
> > >   - good chance to lose tight collaboration with Beam and parties in
> that
> > > project
> > >   - not benefiting from Beam's ecosystem
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Tue, Dec 11, 2018 at 3:38 PM Thomas Weise  wrote:
> > >
> > > > Did you take a look at Apache Beam? It already provides a
> comprehensive
> > > > Python SDK and can be used with Flink:
> > > > https://beam.apache.org/roadmap/portability/#python-on-flink
> > > >
> > > > We are using it at Lyft for Python streaming pipelines.
> > > >
> > > > Thomas
> > > >
> > > > On Tue, Dec 11, 2018 at 5:54 AM Xianda Ke 
> wrote:
> > > >
> > > > > Hi Till,
> > > > >
> > > > > 1. So far as I know, most of the users at Alibaba are using SQL.
> > Some
> > > of
> > > > > users at Alibaba want integrated python libraries with Flink for
> > > > streaming
> > > > > processing, and Jython is unusable.
> > > > >
> > > > > 2. Python UDFs for SQL:
> > > > > * declaring python UDF based on Alibaba's internal DDL syntax.
> > > > > * start a Python process in open()
> > > > > * communicate with JVM process via Socket.
> > > > > * Yes, it support python libraries, users can upload
> virutalenv/conda
> > > > > Python runtime
> > > > >
> > > > > 3. We've draft a design doc for Python API
> > > > >  [DISCUSS] Flink Python API
> > > > > <
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1JNGWdLwbo_btq9RVrc1PjWJV3lYUgPvK0uEWDIfVNJI/edit?usp=drive_web
> > > > > >
> > > > >
> > > > > Python UDF for SQL is not discussed in this documentation, we'll
> > > create a
> > > > > new proposal when the SQL DDL is ready.
> > > > >
> > > > > On Mon, Dec 10, 2018 at 9:52 PM Till Rohrmann <
> trohrm...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi Xianda,
> > > > > >
> > > > > > thanks for sharing this detailed feedback. Do I understand you
> > > > correctly
> > > > > > that flink-python and flink-streaming-python are not usable for
> the
> > > use
> > > > > > cases at Alibaba atm?
> > > > > >
> > > > > > Could you sha

Re: getting an error when configuring state backend to hdfs

2018-12-19 Thread Avi Levi
when I try running from my IDE (intellij) I am getting this exception
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
JobResult.
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at
com.bluevoyant.StreamingJob$.delayedEndpoint$com$bluevoyant$StreamingJob$1(StreamingJob.scala:41)
at com.bluevoyant.StreamingJob$delayedInit$body.apply(StreamingJob.scala:15)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.bluevoyant.StreamingJob$.main(StreamingJob.scala:15)
at com.bluevoyant.StreamingJob.main(StreamingJob.scala)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit job.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID
counter: Could not find a file system implementation for scheme 'hdfs'. The
scheme is not directly supported by Flink and no Hadoop file system to
support this scheme could be loaded.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:255)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:

Issue with Flink not able to properly read the ResourceManager address for a HA setup

2018-12-19 Thread Sai Inampudi
Hi, I am trying to create a flink cluster on yarn, by running the following 
command but the logs[1] are showing that it is unable to properly connect to 
the ResourceManager 

~/flink-1.5.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm flink_yarn


I found a stackoverflow[2] post where someone mentioned that this could be a 
result of the flink's packaged hadoop version being different than the hadoop 
on the node and therefore the flink is not able to properly read the 
ResourceManager address for a HA setup. However, I confirmed the versions are 
the same in my case. I downloaded flink-1.5.4-bin-hadoop26-scala_2.11 and when 
I do a hadoop version on the node, I get Hadoop 2.6.0-cdh5.14.0. Would anyone 
have any ideas on what else the issue could be?

Additional info: The cluster I am running these on is kerberized so I am not 
sure if that plays into the issue that is being caused. I setup flink-conf to 
use kerberos ticket cache and did a kinit before trying to stand up the 
cluster. I verified the ticket cache was generated by doing a klist (logs in 
the gist [2])


[1] https://gist.github.com/sai-inampudi/9e1e823096d2685ed2282827432ef311
[2] 
https://stackoverflow.com/questions/32085990/error-with-kerberos-authentication-when-executing-flink-example-code-on-yarn-clu



Flink 1.5 + resource elasticity resulting in overloaded workers

2018-12-19 Thread jelmer
Hi, We recently upgraded to flink 1.6 and seem to be suffering from the
issue described in this email

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-tt23364.html

Our workers have 8 slots and some workers are fully loaded and as a
consequence get to cope with heavy load during peak times. while other
workers sit completely idle. and have 0 jobs assigned to its slots

Is there any workaround for this . short of reducing the number of slots on
a worker ?

We need to have double the slots we need available in order to cope with
availability zone maintenance . So if we where to reduce the number of
slots we'd have to add new nodes that would then mostly sit idle


Re: Flink 1.5 + resource elasticity resulting in overloaded workers

2018-12-19 Thread Vishal Santoshi
+1.


On Thu, Dec 20, 2018, 12:56 AM jelmer  Hi, We recently upgraded to flink 1.6 and seem to be suffering from the
> issue described in this email
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-5-job-distribution-over-cluster-nodes-tt23364.html
>
> Our workers have 8 slots and some workers are fully loaded and as a
> consequence get to cope with heavy load during peak times. while other
> workers sit completely idle. and have 0 jobs assigned to its slots
>
> Is there any workaround for this . short of reducing the number of slots
> on a worker ?
>
> We need to have double the slots we need available in order to cope with
> availability zone maintenance . So if we where to reduce the number of
> slots we'd have to add new nodes that would then mostly sit idle
>
>
>
>


Re: Issue with Flink not able to properly read the ResourceManager address for a HA setup

2018-12-19 Thread Paul Lam
Hi Sai,

It looks like the Hadoop config path is not correctly set. You could set the 
logging level in log4j-cli.properties to debug to get more informations.

Best,
Paul Lam

> 在 2018年12月20日,03:18,Sai Inampudi  写道:
> 
> Hi, I am trying to create a flink cluster on yarn, by running the following 
> command but the logs[1] are showing that it is unable to properly connect to 
> the ResourceManager 
> 
> ~/flink-1.5.4/bin/yarn-session.sh -n 5 -tm 2048 -s 4 -d -nm flink_yarn
> 
> 
> I found a stackoverflow[2] post where someone mentioned that this could be a 
> result of the flink's packaged hadoop version being different than the hadoop 
> on the node and therefore the flink is not able to properly read the 
> ResourceManager address for a HA setup. However, I confirmed the versions are 
> the same in my case. I downloaded flink-1.5.4-bin-hadoop26-scala_2.11 and 
> when I do a hadoop version on the node, I get Hadoop 2.6.0-cdh5.14.0. Would 
> anyone have any ideas on what else the issue could be?
> 
> Additional info: The cluster I am running these on is kerberized so I am not 
> sure if that plays into the issue that is being caused. I setup flink-conf to 
> use kerberos ticket cache and did a kinit before trying to stand up the 
> cluster. I verified the ticket cache was generated by doing a klist (logs in 
> the gist [2])
> 
> 
> [1] https://gist.github.com/sai-inampudi/9e1e823096d2685ed2282827432ef311
> [2] 
> https://stackoverflow.com/questions/32085990/error-with-kerberos-authentication-when-executing-flink-example-code-on-yarn-clu
>