Re: StreamingFileSink rolling callback Inbox

2019-09-12 Thread Kostas Kloudas
Hi Anton,

First of all, there is this PR
https://github.com/apache/flink/pull/9581 that may be interesting to
you.

Second, I think you have to keep in mind that the hourly bucket
reporting will be per-subtask. So if you have parallelism of 4, each
of the 4 tasks will report individually that they are done with hour
e.g. 10, and it is up to the receiving end to know if it should wait
for more or not. This may be a problem for your stateful assigner
approach as the assigner cannot know by default which subtask it
belongs to. If, for example, you have parallelism of 1, then your
stateful assigner approach could work, although it suffers from the
problem you also mentioned, that it is not integrated with
checkpointing (so a part file may be "reverted") and that a file may
roll, but it does not mean that the previous is already written to the
FS.

Third, a solution could be that instead of having the job itself
pushing notifications that a part file has rolled (which may suffer
from the problem that a part file may roll but the FS takes some time
until it writes everything to disk), you could simply monitor the FS
directory where you are writing your buckets, and parse the part file
names in order to know that all subtasks have finished with hour X.
This can be done by another job which will also put notifications to
the SQS. I think that this will also solve your concern: "I’m also
thinking on how I should couple this with checkpointing mechanism as
ideally I’d like to not invoke this callback before checkpoint is
written."

Cheers,
Kostas

On Mon, Sep 9, 2019 at 12:40 PM Anton Parkhomenko  wrote:
>
> Hello,
>
> I’m writing a Flink job that reads heterogenius (one row contains several 
> types that need to be partitioned downstream) data from AWS Kinesis and 
> writes to S3 directory structure like s3://bucket/year/month/day/hour/type, 
> this all works great with StreamingFileSink in Flink 1.9, but problem is that 
> I need to immedietely (or “as soon as possible” rather) let know another 
> application to know when “hour” bucket has rolled (i.e. we’re 100% sure it 
> won’t write any more data for this hour). Another problem is that data can be 
> very skewed in types, e.g. one hour can contain 90% of rows with typeA, 30% 
> of rows with typeB and 1% of rows with typeC.
>
> My current plan is to:
>
> 1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t 
> care about event time at all)
> 2. Assign every row its bucket in a windowing function
> 3. Write a stateful BucketAssigner that:
> 3.1. Keeps its last window in a mutable variable
> 3.2. Once we received a row with newer window sends a message to SQS and 
> increments the window
>
> My biggest concern now is about 3rd point. For me BucketAssigner looks like a 
> pure function of (Row, Time) -> Bucket and I’m not sure that introducing 
> state and side-effect there would be reasonable. Is there any other ways to 
> do it? I’m also thinking on how I should couple this with checkpointing 
> mechanism as ideally I’d like to not invoke this callback before checkpoint 
> is written.
>
> StreamingFileSink provides not much ways to extend it. I tried to 
> re-implement it for my purposes, but stumbled upon many private methods and 
> classes, so even though it looks possible, the end result probably will be 
> too ugly.
>
> To make things a little bit easier, I don’t care too much about delivery 
> semantics of those final SQS messages - if I get only ~99% of them - that’s 
> fine, if some of them will be duplicated - that’s also fine.
>
> Regards,
> Anton


SIGSEGV error

2019-09-12 Thread Marek Maj
Hi everyone,

Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an upgrade
our task managers started to fail with SIGSEGV error from time to time.

In process of adjusting the code to 1.8.1, we noticed that there were some
changes around TypeSerializerSnapshot interface and its implementations. At
that time we had a few custom serializers which we decided to throw out
during migration and then leverage flink default serializers. We don't mind
clearing the state in the process of migration, an effort to migrate with
state seems to be not worth it.

Unfortunately after running new version we see SIGSEGV errors from time to
time. It may be that serialization is not the real cause, but at the moment
it seems to be the most probable reason. We have not performed any
significant code changes besides serialization area.

We run job on yarn, hdp version 2.7.3.2.6.2.0-205.
Checkpoint configuration: RocksDB backend, not incremental, 50s min
processing time

You can find parts of JobManager log and ErrorFile log of failed container
included below.

Any suggestions are welcome

Best regards
Marek Maj

jobmanager.log

019-09-10 16:30:28.177 INFO  o.a.f.r.c.CheckpointCoordinator   - Completed
checkpoint 47 for job c8a9ae03785ade86348c3189cf7dd965 (18532488122 bytes
in 60871 ms).

2019-09-10 16:31:19.223 INFO  o.a.f.r.c.CheckpointCoordinator   -
Triggering checkpoint 48 @ 1568111478177 for job
c8a9ae03785ade86348c3189cf7dd965.

2019-09-10 16:32:19.280 INFO  o.a.f.r.c.CheckpointCoordinator   - Completed
checkpoint 48 for job c8a9ae03785ade86348c3189cf7dd965 (19049515705 bytes
in 61083 ms).

2019-09-10 16:33:10.480 INFO  o.a.f.r.c.CheckpointCoordinator   -
Triggering checkpoint 49 @ 1568111589279 for job
c8a9ae03785ade86348c3189cf7dd965.

2019-09-10 16:33:36.773 WARN  o.a.f.r.r.h.l.m.MetricFetcherImpl   -
Requesting TaskManager's path for query services failed.

java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
Ask timed out on [Actor[akka://flink/user/dispatcher#374570759]] after
[1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)

at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)

at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)

at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)

at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:816)

at akka.dispatch.OnComplete.internal(Future.scala:258)

at akka.dispatch.OnComplete.internal(Future.scala:256)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)

at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)

at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)

at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)

at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)

at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)

at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)

at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)

at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)

at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)

at java.lang.Thread.run(Thread.java:745)

Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#374570759]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)

... 9 common frames omitted

2019-09-10 16:33:48.782 WARN  o.a.f.r.r.h.l.m.MetricFetcherImpl   -
Requesting TaskManager's path for query services failed.

java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
Ask timed out on [Actor[akka://flink/user/dispatcher#374570759]] after
[1 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)

at
java.util.concurrent.CompletableFuture.completeThrow

Re: Problem starting taskexecutor daemons in 3 node cluster

2019-09-12 Thread Komal Mariam
I managed to fix it however ran into another problem that I could
appreciate help in resolving.

it turns out that the username for all three nodes was different. having
the same username for them fixed the issue. i.e
same_username@slave-node2-hostname
same_username@slave-node3-hostname
same_username@master-node1-hostname

Infact, because the usernames are the same, I can just save them in the
conf files as:
slave-node2-hostname
slave-node3-hostname
master-node1-hostname

However, for some reason my worker nodes dont show up in the available task
manager in the web UI.

The taskexecutor log says the following:
... (clipped for brevity)
2019-09-12 15:56:36,625 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -

2019-09-12 15:56:36,631 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered
UNIX signal handlers for [TERM, HUP, INT]
2019-09-12 15:56:36,647 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum
number of open file descriptors is 1048576.
2019-09-12 15:56:36,710 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, 150.82.218.218
2019-09-12 15:56:36,711 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2019-09-12 15:56:36,712 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2019-09-12 15:56:36,713 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.size, 1024m
2019-09-12 15:56:36,714 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2019-09-12 15:56:36,715 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2019-09-12 15:56:36,717 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.execution.failover-strategy, region
2019-09-12 15:56:37,097 INFO  org.apache.flink.core.fs.FileSystem
- Hadoop is not in the classpath/dependencies. The extended
set of supported File Systems via Hadoop is not available.
2019-09-12 15:56:37,221 INFO
 org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
create Hadoop Security Module because Hadoop cannot be found in the
Classpath.
2019-09-12 15:56:37,305 INFO
 org.apache.flink.runtime.security.SecurityUtils   - Cannot
install HadoopSecurityContext because Hadoop cannot be found in the
Classpath.
2019-09-12 15:56:38,142 INFO  org.apache.flink.configuration.Configuration
 - Config uses fallback configuration key
'jobmanager.rpc.address' instead of key 'rest.address'
2019-09-12 15:56:38,169 INFO
 org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
select the network interface and address to use by connecting to the
leading JobManager.
2019-09-12 15:56:38,170 INFO
 org.apache.flink.runtime.util.LeaderRetrievalUtils-
TaskManager will try to connect for 1 milliseconds before falling back
to heuristics
2019-09-12 15:56:38,185 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Retrieved new target address /150.82.218.218:6123.
2019-09-12 15:56:39,691 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Trying to connect to address /150.82.218.218:6123
2019-09-12 15:56:39,693 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address 'salman-hpc/127.0.1.1':
Invalid argument (connect failed)
2019-09-12 15:56:39,696 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address '/150.82.219.73': No
route to host (Host unreachable)
2019-09-12 15:56:39,698 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address
'/fe80:0:0:0:1e10:83f4:a33a:a208%enp5s0f1': Network is unreachable (connect
failed)
2019-09-12 15:56:39,748 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address '/150.82.219.73': connect
timed out
2019-09-12 15:56:39,750 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address '/0:0:0:0:0:0:0:1%lo':
Network is unreachable (connect failed)
2019-09-12 15:56:39,751 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address '/127.0.0.1': Invalid
argument (connect failed)
2019-09-12 15:56:39,753 INFO  org.apache.flink.runtime.net.ConnectionUtils
 - Failed to connect from address
'/fe80:0:0:0:1e10:83f4:a33a:a208%enp5s0f1': Network is unreachable (connect
failed)
"flink-komal-taskexecutor-0-salman-hpc.log" 157L, 29954C

I'd a

Re: suggestion of FLINK-10868

2019-09-12 Thread Anyang Hu
Thanks Till, I will continue to follow this issue and see what we can do.

Best regards,
Anyang

Till Rohrmann  于2019年9月11日周三 下午5:12写道:

> Suggestion 1 makes sense. For the quick termination I think we need to
> think a bit more about it to find a good solution also to support strict
> SLA requirements.
>
> Cheers,
> Till
>
> On Wed, Sep 11, 2019 at 11:11 AM Anyang Hu  wrote:
>
>> Hi Till,
>>
>> Some of our online batch tasks have strict SLA requirements, and they are
>> not allowed to be stuck for a long time. Therefore, we take a rude way to
>> make the job exit immediately. The way to wait for connection recovery is a
>> better solution. Maybe we need to add a timeout to wait for JM to restore
>> the connection?
>>
>> For suggestion 1, make interval configurable, given that we have done it,
>> and if we can, we hope to give back to the community.
>>
>> Best regards,
>> Anyang
>>
>> Till Rohrmann  于2019年9月9日周一 下午3:09写道:
>>
>>> Hi Anyang,
>>>
>>> I think we cannot take your proposal because this means that whenever we
>>> want to call notifyAllocationFailure when there is a connection problem
>>> between the RM and the JM, then we fail the whole cluster. This is
>>> something a robust and resilient system should not do because connection
>>> problems are expected and need to be handled gracefully. Instead if one
>>> deems the notifyAllocationFailure message to be very important, then one
>>> would need to keep it and tell the JM once it has connected back.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, Sep 8, 2019 at 11:26 AM Anyang Hu 
>>> wrote:
>>>
 Hi Peter,

 For our online batch task, there is a scene where the failed Container
 reaches MAXIMUM_WORKERS_FAILURE_RATE but the client will not immediately
 exit (the probability of JM loss is greatly improved when thousands of
 Containers is to be started). It is found that the JM disconnection (the
 reason for JM loss is unknown) will cause the notifyAllocationFailure not
 to take effect.

 After the introduction of FLINK-13184
  to start  the
 container with multi-threaded, the JM disconnection situation has been
 alleviated. In order to stably implement the client immediate exit, we use
 the following code to determine  whether call onFatalError when
 MaximumFailedTaskManagerExceedingException is occurd:

 @Override
 public void notifyAllocationFailure(JobID jobId, AllocationID 
 allocationId, Exception cause) {
validateRunsInMainThread();

JobManagerRegistration jobManagerRegistration = 
 jobManagerRegistrations.get(jobId);
if (jobManagerRegistration != null) {
   
 jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
  cause);
} else {
   if (exitProcessOnJobManagerTimedout) {
  ResourceManagerException exception = new 
 ResourceManagerException("Job Manager is lost, can not notify allocation 
 failure.");
  onFatalError(exception);
   }
}
 }


 Best regards,

 Anyang




Re: suggestion of FLINK-10868

2019-09-12 Thread Peter Huang
Hi Anyang and Till,

I think we agreed on making the interval configurable in this case. Let me
revise the current PR. You can review it after that.



Best Regards
Peter Huang

On Thu, Sep 12, 2019 at 12:53 AM Anyang Hu  wrote:

> Thanks Till, I will continue to follow this issue and see what we can do.
>
> Best regards,
> Anyang
>
> Till Rohrmann  于2019年9月11日周三 下午5:12写道:
>
>> Suggestion 1 makes sense. For the quick termination I think we need to
>> think a bit more about it to find a good solution also to support strict
>> SLA requirements.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 11, 2019 at 11:11 AM Anyang Hu 
>> wrote:
>>
>>> Hi Till,
>>>
>>> Some of our online batch tasks have strict SLA requirements, and they
>>> are not allowed to be stuck for a long time. Therefore, we take a rude way
>>> to make the job exit immediately. The way to wait for connection recovery
>>> is a better solution. Maybe we need to add a timeout to wait for JM to
>>> restore the connection?
>>>
>>> For suggestion 1, make interval configurable, given that we have done
>>> it, and if we can, we hope to give back to the community.
>>>
>>> Best regards,
>>> Anyang
>>>
>>> Till Rohrmann  于2019年9月9日周一 下午3:09写道:
>>>
 Hi Anyang,

 I think we cannot take your proposal because this means that whenever
 we want to call notifyAllocationFailure when there is a connection problem
 between the RM and the JM, then we fail the whole cluster. This is
 something a robust and resilient system should not do because connection
 problems are expected and need to be handled gracefully. Instead if one
 deems the notifyAllocationFailure message to be very important, then one
 would need to keep it and tell the JM once it has connected back.

 Cheers,
 Till

 On Sun, Sep 8, 2019 at 11:26 AM Anyang Hu 
 wrote:

> Hi Peter,
>
> For our online batch task, there is a scene where the failed Container
> reaches MAXIMUM_WORKERS_FAILURE_RATE but the client will not immediately
> exit (the probability of JM loss is greatly improved when thousands of
> Containers is to be started). It is found that the JM disconnection (the
> reason for JM loss is unknown) will cause the notifyAllocationFailure not
> to take effect.
>
> After the introduction of FLINK-13184
>  to start  the
> container with multi-threaded, the JM disconnection situation has been
> alleviated. In order to stably implement the client immediate exit, we use
> the following code to determine  whether call onFatalError when
> MaximumFailedTaskManagerExceedingException is occurd:
>
> @Override
> public void notifyAllocationFailure(JobID jobId, AllocationID 
> allocationId, Exception cause) {
>validateRunsInMainThread();
>
>JobManagerRegistration jobManagerRegistration = 
> jobManagerRegistrations.get(jobId);
>if (jobManagerRegistration != null) {
>   
> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>  cause);
>} else {
>   if (exitProcessOnJobManagerTimedout) {
>  ResourceManagerException exception = new 
> ResourceManagerException("Job Manager is lost, can not notify allocation 
> failure.");
>  onFatalError(exception);
>   }
>}
> }
>
>
> Best regards,
>
> Anyang
>
>


RE: Filter events based on future events

2019-09-12 Thread Theo Diefenthal


Hi Fabian, 




Thank’s for sharing your thought’s. I’ll give it a try. 



Best regards 

Theo 



From: Fabian Hueske  
Sent: Mittwoch, 11. September 2019 09:55 
To: theo.diefent...@scoop-software.de 
Cc: user  
Subject: Re: Filter events based on future events 




Hi Theo, 





I would implement this with a KeyedProcessFunction. 


These are the important points to consider: 





1) partition the output of the Kafka source by Kafka partition (or the 
attribute that determines the partition). This will ensure that the data stay 
in order (per partition). 


2) The KeyedProcessFunction needs state to buffer the data of one minute. It 
depends on the amount of data that you expect to buffer which state is the most 
efficient. If you expect that one minute can be easily hold in memory, I'd use 
a FS state backend which keeps all state on the JVM heap. You could use a 
ValueState with an appropriate data structure (Queue, PrioQueue, ...). The data 
structure would be held as regular Java object on the heap and hence provide 
efficient access. If you expect the one minute to be too much data to be held 
in memory, you need to go for the RocksDB state backend. Since this causes 
de/serialization with every read and write access, it's more difficult to 
identify an efficient state primitive / access pattern. I won't go into the 
details here, assuming that the buffered data fits into memory and you can go 
for the FS state backend. If that's not the case, let me know and I can share 
some tips on the RocksDB state backend approach. The KeyedProcessFunction would 
add records to the buffer state when processElement() is called and emit all 
buffered records that have a timestamp of less than the timestamp of the 
currently added record - 1 minute. 





Note, since the timestamps are monotonically increasing, we do not need 
watermarks and event-time but can rely on the timestamps of the records. Hence, 
the application won't block if one partition stalls providing the same benefits 
that per-key watermarks would offer (if they were supported by Flink). 





Best, Fabian 





Am Di., 10. Sept. 2019 um 23:06 Uhr schrieb [ 
mailto:theo.diefent...@scoop-software.de | theo.diefent...@scoop-software.de ] 
< [ mailto:theo.diefent...@scoop-software.de | 
theo.diefent...@scoop-software.de ] >: 





Hi there, 





I have the following use case: 





I get transaction logs from multiple servers. Each server puts its logs into 
its own Kafka partition so that within each partition the elements are 
monothonically ordered by time. 





Within the stream of transactions, we have some special events. Let's call them 
A. (roughly 1-10% in distribution have this type). 





An A event can have an Anti-A event later on in time. That is an event which 
has all the same attributes (like username, faculty,..) but differs in one 
boolean attribute indicating that it is an anti event. Kind of a retraction. 





Now I want to emit almost all events downstream (including neither A nor 
Anti-A, let's call them simpy B), preserving the monothonical order of events. 
There is just one special case in which I want to filter out an element: If the 
stream has an A event followed by an Anti-A event within one minute time, only 
the Anti-A event shall go downstream, not A itself. But if there is no Anti-A 
event, A shall be emitted and shall still be within timestamp order of events. 





I'm wrangling my head around it a lot and don't come up with a proper 
(performant) solution. It seems to be obvious that in the end, I need to buffer 
all records over 1 minute so that order can be preserved. But I have no idea 
how to implement this in Flink efficiently. 





My thoughts thus far: 





1. I could give CEP a try. But in that CEP I would need to write something like 
match all B events in any case. And match A also but only if there is no anti A 
=> doesn`t that produce a lot of state? And are all B events considered in the 
breadth first rule match approach, I. E. Tons of unnecessary comparisons 
against A? Any pseudo code on how I could do this with CEP? 





2. If I key data by partition and all other attributes except for the retract 
boolean so that A and anti A always fall into the same keyed stream but no 
other event in that stream, I probably get much better comparison capabilities. 
But how much overhead do I produce with it? Will Flink reshuffle the data even 
if the first key stays the same? And can I backpartiton to my "global" per 
partition order? Note that some events have the exact event time timestamp but 
I still want to have them in their original order later on. 





3. Could I work with session windows somehow? Putting A and Anti A in the same 
session and in window emit I would just not collect the A event if there is an 
Anti A? Would it be more or less overhead compared to CEP? 





4. Do you have any other idea on how to approach this? Sadly, I have no way to 
manipulate the input stream, so that 

Flink web ui authentication using nginx

2019-09-12 Thread Kumar Bolar, Harshith
Hi all,

I'm trying to add authentication to the web dashboard using `nginx`. Flink's 
`rest.port` is set to `8081`, connection to this port is disabled by firewall. 
I'm using `nginx` to listen to requests on port 8080 and redirect to port 8081 
them with username/password authentication (Port 8080 is open).

This is what the server block looks like in `nginx.conf`.

server {
listen   8080;
server_name  localhost;
include /etc/nginx/default.d/*.conf;
location / {
proxy_pass https://localhost:8081;
auth_basic   "Administrator's Area";
auth_basic_user_file /etc/apache2/.htpasswd;
}
error_page 404 /404.html;
location = /40x.html {
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
}
}


The port redirection is working fine but there are a couple of issues. When I 
go to the inactive job manager's UI, redirection to the active job manager is 
not happening. And when I try submitting a job from the UI, the upload gets 
stuck at "Saving". I’m using Flink 1.7.2

Has anyone successfully set up web UI authentication on Flink complete with HA 
mode? Any clues would be greatly appreciated.

Thanks,
Harshith


[SURVEY] How many people are using customized RestartStrategy(s)

2019-09-12 Thread Zhu Zhu
Hi everyone,

I wanted to reach out to you and ask how many of you are using a customized
RestartStrategy[1] in production jobs.

We are currently developing the new Flink scheduler[2] which interacts
with restart strategies in a different way. We have to re-design the
interfaces for the new restart strategies (so called
RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
work any more with the new scheduler.

We want to know whether we should keep the way
to customized RestartBackoffTimeStrategy so that existing customized
RestartStrategy can be migrated.

I'd appreciate if you can share the status if you are using customized
RestartStrategy. That will be valuable for use to make decisions.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
[2] https://issues.apache.org/jira/browse/FLINK-10429

Thanks,
Zhu Zhu


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-12 Thread Oytun Tez
Hi Zhu,

We are using custom restart strategy like this:

environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
Time.minutes(10)));


---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:

> Hi everyone,
>
> I wanted to reach out to you and ask how many of you are using a
> customized RestartStrategy[1] in production jobs.
>
> We are currently developing the new Flink scheduler[2] which interacts
> with restart strategies in a different way. We have to re-design the
> interfaces for the new restart strategies (so called
> RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
> work any more with the new scheduler.
>
> We want to know whether we should keep the way
> to customized RestartBackoffTimeStrategy so that existing customized
> RestartStrategy can be migrated.
>
> I'd appreciate if you can share the status if you are using customized
> RestartStrategy. That will be valuable for use to make decisions.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
> [2] https://issues.apache.org/jira/browse/FLINK-10429
>
> Thanks,
> Zhu Zhu
>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-12 Thread Zhu Zhu
Thanks Oytun for the reply!

Sorry for not have stated it clearly. When saying "customized
RestartStrategy", we mean that users implement an
*org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
themselves and use it by configuring like "restart-strategy:
org.foobar.MyRestartStrategyFactoryFactory".

The usage of restart strategies you mentioned will keep working with the
new scheduler.

Thanks,
Zhu Zhu

Oytun Tez  于2019年9月12日周四 下午10:05写道:

> Hi Zhu,
>
> We are using custom restart strategy like this:
>
> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
> Time.minutes(10)));
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>
>> Hi everyone,
>>
>> I wanted to reach out to you and ask how many of you are using a
>> customized RestartStrategy[1] in production jobs.
>>
>> We are currently developing the new Flink scheduler[2] which interacts
>> with restart strategies in a different way. We have to re-design the
>> interfaces for the new restart strategies (so called
>> RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
>> work any more with the new scheduler.
>>
>> We want to know whether we should keep the way
>> to customized RestartBackoffTimeStrategy so that existing customized
>> RestartStrategy can be migrated.
>>
>> I'd appreciate if you can share the status if you are using customized
>> RestartStrategy. That will be valuable for use to make decisions.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
>> [2] https://issues.apache.org/jira/browse/FLINK-10429
>>
>> Thanks,
>> Zhu Zhu
>>
>


Re: Kafka Schema registry

2019-09-12 Thread Elias Levy
Just for a Kafka source:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema


   - There is also a version of this schema available that can lookup the
   writer’s schema (schema which was used to write the record) in Confluent
   Schema Registry
   .
   Using these deserialization schema record will be read with the schema that
   was retrieved from Schema Registry and transformed to a statically
   provided( either through
   ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or
   ConfluentRegistryAvroDeserializationSchema.forSpecific(...)).


On Wed, Sep 11, 2019 at 1:48 PM Lasse Nedergaard 
wrote:

> Hi.
> Do Flink have out of the Box Support for Kafka Schema registry for both
> sources and sinks?
> If not, does anyone knows about a implementation we can build on so we can
> help make it general available in a future release.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Re: How to handle avro BYTES type in flink

2019-09-12 Thread Catlyn Kong
Turns out there was some other deserialization problem unrelated to this.

On Mon, Sep 9, 2019 at 11:15 AM Catlyn Kong  wrote:

> Hi fellow streamers,
>
> I'm trying to support avro BYTES type in my flink application. Since
> ByteBuffer isn't a supported type, I'm converting the field to an
> Array[Byte]:
>
> case Type.BYTES =>
>   (avroObj: AnyRef) => {
>  if (avroObj == null) {
>null
>  } else {
>val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
>val bytes = new Array[Byte](byteBuffer.remaining())
>byteBuffer.get(bytes)
>bytes
>}
>  }
>
> And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this field.
> I'm getting ArrayIndexOutOfBoundsException:
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
> at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>
> Does anyone have experience with deserializing BYTES type from avro and
> make it compatible with the table api? Wondering if it's cause I didn't use
> the correct type or maybe I need to verify if there's enough data left in
> the source?
>
> Any input is appreciated.
>
> Thanks!
> Catlyn
>
>


Re: Kafka Schema registry

2019-09-12 Thread Lasse Nedergaard
Hi Elias

Thanks for letting me know. I have found it but we also need the option to 
register Avro Schema’s and use the registry when we write to Kafka. So we will 
create a serialisation version and when it works implement it into Flink and 
create a pull request for the community. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 12. sep. 2019 kl. 17.45 skrev Elias Levy :
> 
> Just for a Kafka source:
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
> 
> There is also a version of this schema available that can lookup the writer’s 
> schema (schema which was used to write the record) in Confluent Schema 
> Registry. Using these deserialization schema record will be read with the 
> schema that was retrieved from Schema Registry and transformed to a 
> statically provided( either through 
> ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or 
> ConfluentRegistryAvroDeserializationSchema.forSpecific(...)).
> 
>> On Wed, Sep 11, 2019 at 1:48 PM Lasse Nedergaard  
>> wrote:
>> Hi. 
>> Do Flink have out of the Box Support for Kafka Schema registry for both 
>> sources and sinks?
>> If not, does anyone knows about a implementation we can build on so we can 
>> help make it general available in a future release. 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 


externalizing config flies for flink class loader

2019-09-12 Thread Vishwas Siravara
I have a standalone cluster. I have added my own library(jar file) to the
lib/ folder in flink . I submit my job from cli after I start the cluster.
Now I want to externalize a property file which has to be read by this
library. Since this library is loaded by flink's classloader and not the
application class loader I cannot supply this using flink run -C ... since
this works only for user class loader.


Thanks,
Vishwas


Re: How do I start a Flink application on my Flink+Mesos cluster?

2019-09-12 Thread Felipe Gutierrez
Thanks Gary,

I am compiling a new version of Mesos and when I test it again I will reply
here if I found an error.


On Wed, 11 Sep 2019, 09:22 Gary Yao,  wrote:

> Hi Felipe,
>
> I am glad that you were able to fix the problem yourself.
>
> > But I suppose that Mesos will allocate Slots and Task Managers
> dynamically.
> > Is that right?
>
> Yes, that is the case since Flink 1.5 [1].
>
> > Then I put the number of "mesos.resourcemanager.tasks.cpus" to be equal
> or
> > less the available cores on a single node of the cluster. I am not sure
> about
> > this parameter, but only after this configuration it worked.
>
> I would need to see JobManager and Mesos logs to understand why this
> resolved
> your issue. If you do not set mesos.resourcemanager.tasks.cpus explicitly,
> Flink will request CPU resources equal to the number of TaskManager slots
> (taskmanager.numberOfTaskSlots) [2]. Maybe this value was too high in your
> configuration?
>
> Best,
> Gary
>
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> [2]
> https://github.com/apache/flink/blob/0a405251b297109fde1f9a155eff14be4d943887/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java#L344
>
> On Tue, Sep 10, 2019 at 10:41 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> I managed to find what was going wrong. I will write here just for the
>> record.
>>
>> First, the master machine was not login automatically at itself. So I had
>> to give permission for it.
>>
>> chmod og-wx ~/.ssh/authorized_keys
>> chmod 750 $HOME
>>
>> Then I put the number of "mesos.resourcemanager.tasks.cpus" to be equal
>> or less the available cores on a single node of the cluster. I am not sure
>> about this parameter, but only after this configuration it worked.
>>
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Fri, Sep 6, 2019 at 10:36 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am running Mesos without DC/OS [1] and Flink on it. Whe I start my
>>> cluster I receive some messages that I suppose everything was started.
>>> However, I see 0 slats available on the Flink web dashboard. But I suppose
>>> that Mesos will allocate Slots and Task Managers dynamically. Is that right?
>>>
>>> $ ./bin/mesos-appmaster.sh &
>>> [1] 16723
>>> flink@r03:~/flink-1.9.0$ I0906 10:22:45.080328 16943 sched.cpp:239]
>>> Version: 1.9.0
>>> I0906 10:22:45.082672 16996 sched.cpp:343] New master detected at
>>> mas...@xxx.xxx.xxx.xxx:5050
>>> I0906 10:22:45.083276 16996 sched.cpp:363] No credentials provided.
>>> Attempting to register without authentication
>>> I0906 10:22:45.086840 16997 sched.cpp:751] Framework registered with
>>> 22f6a553-e8ac-42d4-9a90-96a8d5f002f0-0003
>>>
>>> Then I deploy my Flink application. When I use the first command to
>>> deploy the application starts. However, the tasks remain CREATED until
>>> Flink throws a timeout exception. In other words, it never turns to RUNNING.
>>> When I use the second comman to deploy the application it does not start
>>> and I receive the exception of "Could not allocate all requires slots
>>> within timeout of 30 ms. Slots required: 2". The full stacktrace is
>>> below.
>>>
>>> $ /home/flink/flink-1.9.0/bin/flink run
>>> /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
>>> $ ./bin/mesos-appmaster-job.sh run
>>> /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/mesos.html#mesos-without-dcos
>>> ps.: my application runs normally on a standalone Flink cluster.
>>>
>>> 
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>>> (JobID: 7ad8d71faaceb1ac469353452c43dc2a)
>>> at
>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>> at org.hello_flink_mesos.App.(App.java:35)
>>> at org.hello_flink_mesos.App.main(App.java:285)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>> at
>>> org.

Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-12 Thread Zili Chen
Thanks a lot everyone for the warm welcome. Happy Mid-autumn Festival!

Best,
tison.


Leonard Xu  于2019年9月12日周四 上午11:05写道:

> Congratulations Zili Chen ! !
>
> Best,
> Leonard Xu
> > On 2019年9月12日, at 上午11:02, Yun Tang  wrote:
> >
> > Congratulations Zili
> >
> > Best
> > Yun Tang
> > 
> > From: Yun Gao  yungao...@aliyun.com.invalid>>
> > Sent: Thursday, September 12, 2019 10:12
> > To: dev mailto:d...@flink.apache.org>>
> > Subject: Re: [ANNOUNCE] Zili Chen becomes a Flink committer
> >
> > Congratulations Zili!
> >
> >   Best,
> >   Yun
> >
> >
> > --
> > From:Yangze Guo 
> > Send Time:2019 Sep. 12 (Thu.) 09:38
> > To:dev 
> > Subject:Re: [ANNOUNCE] Zili Chen becomes a Flink committer
> >
> > Congratulations!
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, Sep 12, 2019 at 9:35 AM Rong Rong  wrote:
> >>
> >> Congratulations Zili!
> >>
> >> --
> >> Rong
> >>
> >> On Wed, Sep 11, 2019 at 6:26 PM Hequn Cheng 
> wrote:
> >>
> >>> Congratulations!
> >>>
> >>> Best, Hequn
> >>>
> >>> On Thu, Sep 12, 2019 at 9:24 AM Jark Wu  wrote:
> >>>
>  Congratulations Zili!
> 
>  Best,
>  Jark
> 
>  On Wed, 11 Sep 2019 at 23:06,  wrote:
> 
> > Congratulations, Zili.
> >
> >
> >
> > Best,
> >
> > Xingcan
> >
> >
> >
> > *From:* SHI Xiaogang 
> > *Sent:* Wednesday, September 11, 2019 7:43 AM
> > *To:* Guowei Ma 
> > *Cc:* Fabian Hueske ; Biao Liu <
> mmyy1...@gmail.com>;
> > Oytun Tez ; bupt_ljy ; dev <
> > d...@flink.apache.org>; user ; Till Rohrmann <
> > trohrm...@apache.org>
> > *Subject:* Re: [ANNOUNCE] Zili Chen becomes a Flink committer
> >
> >
> >
> > Congratulations!
> >
> >
> >
> > Regards,
> >
> > Xiaogang
> >
> >
> >
> > Guowei Ma  于2019年9月11日周三 下午7:07写道:
> >
> > Congratulations Zili !
> >
> >
> > Best,
> >
> > Guowei
> >
> >
> >
> >
> >
> > Fabian Hueske  于2019年9月11日周三 下午7:02写道:
> >
> > Congrats Zili Chen :-)
> >
> >
> >
> > Cheers, Fabian
> >
> >
> >
> > Am Mi., 11. Sept. 2019 um 12:48 Uhr schrieb Biao Liu <
>  mmyy1...@gmail.com>:
> >
> > Congrats Zili!
> >
> >
> >
> > Thanks,
> >
> > Biao /'bɪ.aʊ/
> >
> >
> >
> >
> >
> >
> >
> > On Wed, 11 Sep 2019 at 18:43, Oytun Tez  wrote:
> >
> > Congratulations!
> >
> >
> >
> > ---
> >
> > Oytun Tez
> >
> >
> >
> > *M O T A W O R D*
> >
> > *The World's Fastest Human Translation Platform.*
> >
> > oy...@motaword.com  — www.motaword.com <
> http://www.motaword.com/> http://www.motaword.com/>>
> >
> >
> >
> >
> >
> > On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy  wrote:
> >
> > Congratulations!
> >
> >
> >
> > Best,
> >
> > Jiayi Liao
> >
> >
> >
> > Original Message
> >
> > *Sender:* Till Rohrmann
> >
> > *Recipient:* dev; user
> >
> > *Date:* Wednesday, Sep 11, 2019 17:22
> >
> > *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer
> >
> >
> >
> > Hi everyone,
> >
> >
> >
> > I'm very happy to announce that Zili Chen (some of you might also
> know
> > him as Tison Kun) accepted the offer of the Flink PMC to become a
>  committer
> > of the Flink project.
> >
> >
> >
> > Zili Chen has been an active community member for almost 16 months
> now.
> > He helped pushing the Flip-6 effort over the finish line, ported a
> lot
>  of
> > legacy code tests, removed a good part of the legacy code,
> contributed
> > numerous fixes, is involved in the Flink's client API refactoring,
>  drives
> > the refactoring of Flink's HighAvailabilityServices and much more.
> Zili
> > Chen also helped the community by PR reviews, reporting Flink issues,
> > answering user mails and being very active on the dev mailing list.
> >
> >
> >
> > Congratulations Zili Chen!
> >
> >
> >
> > Best, Till
> >
> > (on behalf of the Flink PMC)
>
>


Jobsubmission fails in Flink 1.7.1 High Availability mode

2019-09-12 Thread Bajaj, Abhinav
Hi,

I came across an issue during job submission via Flink Cli Client with Flink 
1.7.1 in high availability mode.

Setup:
Flink version:: 1.7.1
Cluster:: K8s
Mode:: High availability with 2 jobmanagers

CLI Command
./bin/flink run -d -c MyExample /myexample.jar
The CLI runs inside a K8s job and submits the Flink job to the Flink cluster. 
The K8s job spec allows it to try 3 times to submit the job.

Result:
2019-09-11 22:32:12.908 [Flink-RestClusterClient-IO-thread-4] level=DEBUG 
org.apache.flink.runtime.rest.RestClient  - Sending request of class class 
org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody to 
job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081/v1/jobs
2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR 
org.apache.flink.runtime.rest.RestClient  - Response was not valid JSON.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
 No content to map due to end-of-input
at [Source: 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@2b88f8bb; 
line: 1, column: 0]
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:256)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3851)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272)
  at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:504)
  at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452)
 ………
2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR 
org.apache.flink.runtime.rest.RestClient  - Unexpected plain-text response:
……..

The job submission fails after exhausting the number of retries.

Observations:
I looked into the debug logs & Flink code to come to below conclusions –

  *   CLI rest client received an empty response body from the jobmanager 
(job-jm-1). I think the response was a redirect and the RestClient class does 
not handle redirects. This explains the above exception from Jackson and 
missing response body logged in “org.apache.flink.runtime.rest.RestClient  - 
Unexpected plain-text response:” logs above.
  *   The ZooKeeperLeaderRetrievalService in the rest client logs that job-jm-1 
became leader followed by a log that job-jm-0 became leader. The address of 
job-jm-1 is http and address of job-jm-0 is akka url. CLI logs at end of email.
  *   The RestClusterClient class does not update the leader during the job 
submission if the leader changes.
  *   All the 3 times the CLI K8s job tried to submit the Flink job, 
ZooKeeperLeaderRetrievalService finds both the events of job-jm-1 becoming the 
leader followed by job-jm-0. So all the 3 retries fails to submit the job with 
same reason of empty response.
  *   The jobmanager logs from both job-jm-0 and job-jm-1 shows that job-jm-0 
is the leader and job-jm-1 was never a leader. This contradicts the CLI logs.

Open questions:

  *   I am not sure why the CLI’s ZooKeeperLeaderRetrievalService thinks 
job-jm-1 was the leader whereas the both jobmanager’s 
ZooKeeperLeaderRetrievalService considers job-jm-0 as the leader throughout the 
cluster lifetime.
  *   Even if CLI’s ZooKeeperLeaderRetrievalService thinks leader has changed 
from job-jm-1 to job-jm-0, it still uses job-jm-1. Is that a known issue with 
Flink 1.7.1 rest client that it doesn’t update the leader if it changed?
  *   Why one leader address is http while other is akka url?

Can someone help check and confirm my observations above and help answer the 
questions?

Highly appreciate your time and help.

~ Abhinav Bajaj


CLI Logs -

2019-09-11 22:30:31.077 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Leader node has 
changed.
2019-09-11 22:30:31.171 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=http://job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081, session 
ID=c1422a1b-a6b8-43b0-85d7-87b95af16932.

……
2019-09-11 22:30:31.270 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Leader node has 
changed
2019-09-11 22:30:31.270 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=akka.tcp://fl...@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher,
 session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26



job-jm-0 Logs -
2019-09-11 22:29:59.781 [main-EventThread] level=DEBUG 
o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader 
information: 
Leader=akka.tcp://fl...@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6

Re: externalizing config flies for flink class loader

2019-09-12 Thread Vijay Bhaskar
Hi
You can use this way:
Use typesafe configuration, which provides excellent configuration
methodologies.
You supply default configuration, which is read by your application through
reference.conf file of typesafe. If you want to override any of the
defaults you can supply to command line arguments
>From the command line arguments, you can read them into Paramter tool map,
which is supplied by flink. You can use this map to override the default
configuration of the typesafe.
Following is the code snippet:

val pmtool = ParameterTool.fromArgs(args)
val defaultConfig = ConfigFactory.load() //Default config in
reference.conf/application.conf/system properties/env  of typesafe
val overrideConfigFromArgs =  ConfigFactory.load(pmtool.toMap)
val finalConfig =  overrideConfigFromArgs.withFallBack(defaultConfig)
//This is going to override your command line params

Regards
Bhaskar



On Fri, Sep 13, 2019 at 12:51 AM Vishwas Siravara 
wrote:

> I have a standalone cluster. I have added my own library(jar file) to the
> lib/ folder in flink . I submit my job from cli after I start the cluster.
> Now I want to externalize a property file which has to be read by this
> library. Since this library is loaded by flink's classloader and not the
> application class loader I cannot supply this using flink run -C ... since
> this works only for user class loader.
>
>
> Thanks,
> Vishwas
>


Re: externalizing config flies for flink class loader

2019-09-12 Thread Vijay Bhaskar
Sorry there is a typo, corrected it:
val pmtool = ParameterTool.fromArgs(args)
val defaultConfig = ConfigFactory.load() //Default config in
reference.conf/application.conf/system properties/env  of typesafe
val overrideConfigFromArgs =  ConfigFactory.load(pmtool.toMap)
val finalConfig =  overrideConfigFromArgs.withFallBack(defaultConfig)
//This is going to override your default configuration  params

On Fri, Sep 13, 2019 at 11:38 AM Vijay Bhaskar 
wrote:

> Hi
> You can use this way:
> Use typesafe configuration, which provides excellent configuration
> methodologies.
> You supply default configuration, which is read by your application
> through reference.conf file of typesafe. If you want to override any of the
> defaults you can supply to command line arguments
> From the command line arguments, you can read them into Paramter tool map,
> which is supplied by flink. You can use this map to override the default
> configuration of the typesafe.
> Following is the code snippet:
>
> val pmtool = ParameterTool.fromArgs(args)
> val defaultConfig = ConfigFactory.load() //Default config in
> reference.conf/application.conf/system properties/env  of typesafe
> val overrideConfigFromArgs =  ConfigFactory.load(pmtool.toMap)
> val finalConfig =  overrideConfigFromArgs.withFallBack(defaultConfig)
> //This is going to override your command line params
>
> Regards
> Bhaskar
>
>
>
> On Fri, Sep 13, 2019 at 12:51 AM Vishwas Siravara 
> wrote:
>
>> I have a standalone cluster. I have added my own library(jar file) to the
>> lib/ folder in flink . I submit my job from cli after I start the cluster.
>> Now I want to externalize a property file which has to be read by this
>> library. Since this library is loaded by flink's classloader and not the
>> application class loader I cannot supply this using flink run -C ... since
>> this works only for user class loader.
>>
>>
>> Thanks,
>> Vishwas
>>
>