Re: [BULK]Re: [SURVEY] Remove Mesos support

2020-10-27 Thread Oleksandr Nitavskyi
Hello Xintong,

Thanks for the insights and support.

Browsing the Mesos backlog and didn't identify anything critical, which is left 
there.

I see that there are were quite a lot of contributions to the Flink Mesos in 
the recent version: https://github.com/apache/flink/commits/master/flink-mesos.
We plan to validate the current Flink master (or release 1.12 branch) our Mesos 
setup. In case of any issues, we will try to propose changes.
My feeling is that our test results shouldn't affect the Flink 1.12 release 
cycle. And if any potential commits will land into the 1.12.1 it should be 
totally fine.

In the future, we would be glad to help you guys with any maintenance-related 
questions. One of the highest priorities around this component seems to be the 
development of the full e2e test.

Kind Regards
Oleksandr Nitavskyi

From: Xintong Song 
Sent: Tuesday, October 27, 2020 7:14 AM
To: dev ; user 
Cc: Piyush Narang 
Subject: [BULK]Re: [SURVEY] Remove Mesos support

Hi Piyush,

Thanks a lot for sharing the information. It would be a great relief that you 
are good with Flink on Mesos as is.

As for the jira issues, I believe the most essential ones should have already 
been resolved. You may find some remaining open issues here [1], but not all of 
them are necessary if we decide to keep Flink on Mesos as is.

At the moment and in the short future, I think helps are mostly needed on 
testing the upcoming release 1.12 with Mesos use cases. The community is 
currently actively preparing the new release, and hopefully we could come up 
with a release candidate early next month. It would be greatly appreciated if 
you fork as experienced Flink on Mesos users can help with verifying the 
release candidates.


Thank you~

Xintong Song

[1] 
https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17402%3Fjql%3Dproject%2520%253D%2520FLINK%2520AND%2520component%2520%253D%2520%2522Deployment%2520%252F%2520Mesos%2522%2520AND%2520status%2520%253D%2520Open=04%7C01%7Co.nitavskyi%40criteo.com%7C3585e1f25bdf4e091af808d87a3f92db%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637393760750820881%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=hytJFQE0MCPzMLiQTQTdbg3GVckX5M3r1NPRGrRV8j4%3D=0>

On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang 
mailto:p.nar...@criteo.com>> wrote:

Hi Xintong,



Do you have any jiras that cover any of the items on 1 or 2? I can reach out to 
folks internally and see if I can get some folks to commit to helping out.



To cover the other qs:

  *   Yes, we’ve not got a plan at the moment to get off Mesos. We use Yarn for 
some our Flink workloads when we can. Mesos is only used when we need streaming 
capabilities in our WW dcs (as our Yarn is centralized in one DC)
  *   We’re currently on Flink 1.9 (old planner). We have a plan to bump to 
1.11 / 1.12 this quarter.
  *   We typically upgrade once every 6 months to a year (not every release). 
We’d like to speed up the cadence but we’re not there yet.
  *   We’d largely be good with keeping Flink on Mesos as-is and functional 
while missing out on some of the newer features. We understand the pain on the 
communities side and we can take on the work if we see some fancy improvement 
in Flink on Yarn / K8s that we want in Mesos to put in the request to port it 
over.



Thanks,



-- Piyush





From: Xintong Song mailto:tonysong...@gmail.com>>
Date: Sunday, October 25, 2020 at 10:57 PM
To: dev mailto:d...@flink.apache.org>>, user 
mailto:user@flink.apache.org>>
Cc: Lasse Nedergaard 
mailto:lassenedergaardfl...@gmail.com>>, 
mailto:p.nar...@criteo.com>>
Subject: Re: [SURVEY] Remove Mesos support



Thanks for sharing the information with us, Piyush an Lasse.



@Piyush



Thanks for offering the help. IMO, there are currently several problems that 
make supporting Flink on Mesos challenging for us.

  1.  Lack of Mesos experts. AFAIK, there are very few people (if not none) 
among the active contributors in this community that are familiar with Mesos 
and can help with development on this component.
  2.  Absence of tests. Mesos does not provide a testing cluster, like 
`MiniYARNCluster`, making it hard to test interactions between Flink and Mesos. 
We have only a few very simple e2e tests running on Mesos deployed in a docker, 
covering the most fundamental workflows. We are not sure how well those tests 
work, especially against some potential corner cases.
  3.  Divergence from other deployment. Because of 1 and 2, the new efforts 
(features, maintenance, refactors) tend to exclude Mesos if possible. When the 
new efforts have to touch the Mesos related components (e.g., changes to the 
common resource manag

Re: [BULK]Re: DisableGenericTypes is not compatible with Kafka

2020-02-05 Thread Oleksandr Nitavskyi
Thanks, guys for the answers.

Aljoscha, I have a question to ensure I get it right.
Am I correctly understand that this newly created TypeSerializer should use 
Kryo under the hood, so we keep the backward compatibility of the state and do 
not get an exception if generic types are disabled?

Thanks
Kind Regards
Oleksandr

From: Aljoscha Krettek 
Sent: Tuesday, February 4, 2020 2:29 PM
To: user@flink.apache.org 
Subject: [BULK]Re: DisableGenericTypes is not compatible with Kafka

Unfortunately, the fact that the Kafka Sources use Kryo for state
serialization is a very early design misstep that we cannot get rid of
for now. We will get rid of that when the new source interface lands
([1]) and when we have a new Kafka Source based on that.

As a workaround, we should change the Kafka Consumer to go through a
different constructor of ListStateDescriptor which directly takes a
TypeSerializer instead of a TypeInformation here: [2]. This should
sidestep the "no generic types" check.

I created a Jira Issue for this:
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15904data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634652116sdata=OIMcxBp5dh%2FxZQw%2BBWTEkQnMHh%2BzengVNvW%2B%2FZvZRbY%3Dreserved=0

Best,
Aljoscha

[1]
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FFLINK%2FFLIP-27%253A%2BRefactor%2BSource%2BInterfacedata=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=sY7nurLvKaR7YnHIAr8ZFEdUmjuMfN%2BrYvMliCRSBh0%3Dreserved=0
[2]
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F68cc21e4af71505efa142110e35a1f8b1c25fe6e%2Fflink-connectors%2Fflink-connector-kafka-base%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Fkafka%2FFlinkKafkaConsumerBase.java%23L860data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=fB%2F%2FOK7sSA93TycaSV5Z0g8EPYglH8fSlRhRt3nJLVE%3Dreserved=0

On 01.02.20 09:44, Guowei Ma wrote:
> Hi,
> I think there could be two workaround ways to 'disableGenericType' in case
> of KafkaSource :
> 1. adding the TypeInfo annotation [1] to the KafaTopicPartition.
> 2. using the reflection to call the private method. :)
>
> Maybe we could add this TypeInfo annotation to the KafakaConnector.
>
> [1]
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Ftypes_serialization.html%23defining-type-information-using-a-factorydata=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=YbnYb1Cjf%2BqotG8WkE8hC8ElpX9S2C%2BPDn464Hn5XyI%3Dreserved=0
>
> Best,
> Guowei
>
>
> Oleksandr Nitavskyi  于2020年1月31日周五 上午12:40写道:
>
>> Hi guys,
>>
>>
>>
>> We have encountered on some issue related with possibility to
>> ‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as
>> idea to ensure that nobody introduce some random change which penalize the
>> performance of the job.
>>
>>
>>
>> The issue we have encountered is that Flink’s KafkaSource is storing
>> KafkaTopicPartition in the state for offset recovery, which is serialized
>> with Kryo.
>>
>> For sure this feature itself is not penalizing performance, but looks like
>> it reduces the usefulness of the possibility to ‘disableGenericTypes’. Also
>> on the side of Flink user there is no good tool to add
>> KafkaTopicPartition’s non-Kryo type information.
>>
>>
>>
>> On of the related tickets I have found:
>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-12031data=02%7C01%7Co.nitavskyi%40criteo.com%7C798f14900d7a450b4f3508d7a9763f2a%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637164197634662107sdata=%2BvVo6XdXdYbHgOQWO59On8zim4WR2yIPTVwUgUxql6w%3Dreserved=0
>>
>>
>>
>> Do you know any workaround to ‘disableGenericType’ in case of KafkaSources
>> or what do you think making some development to address this issue?
>>
>>
>>
>> Kind Regards
>>
>> Oleksandr
>>
>>
>>
>


DisableGenericTypes is not compatible with Kafka

2020-01-30 Thread Oleksandr Nitavskyi
Hi guys,

We have encountered on some issue related with possibility to 
‘disableGenericTypes’ (disabling Kryo for the job). It seems a very nice as 
idea to ensure that nobody introduce some random change which penalize the 
performance of the job.

The issue we have encountered is that Flink’s KafkaSource is storing 
KafkaTopicPartition in the state for offset recovery, which is serialized with 
Kryo.
For sure this feature itself is not penalizing performance, but looks like it 
reduces the usefulness of the possibility to ‘disableGenericTypes’. Also on the 
side of Flink user there is no good tool to add KafkaTopicPartition’s non-Kryo 
type information.

On of the related tickets I have found: 
https://issues.apache.org/jira/browse/FLINK-12031

Do you know any workaround to ‘disableGenericType’ in case of KafkaSources or 
what do you think making some development to address this issue?

Kind Regards
Oleksandr



Re: Flink on Mesos

2019-07-23 Thread Oleksandr Nitavskyi
Hey guys.

We have also made implementation in Flink on Mesos component in order to 
support network bandwidth configuration.

Will somebody be able to have a look on our PR: 
https://github.com/apache/flink/pull/8652
There are for sure some details to clarify.

Cheers
Oleksandr

From: Till Rohrmann 
Date: Friday 5 April 2019 at 16:46
To: Juan Gentile 
Cc: "user@flink.apache.org" , Oleksandr Nitavskyi 

Subject: Re: Flink on Mesos

Hi Juan,

thanks for reporting this issue. If you could open an issue and also provide a 
fix for it, then this would be awesome. Please let me know the ticket number so 
that I can monitor it and give your PR a review.

Cheers,
Till

On Fri, Apr 5, 2019 at 5:34 AM Juan Gentile 
mailto:j.gent...@criteo.com>> wrote:
Hello!

We are having a small problem while trying to deploy Flink on Mesos using 
marathon. In our set up of Mesos we are required to specify the amount of disk 
space we want to have for the applications we deploy there.
The current default value in Flink is 0 and it’s currently is not 
parameterizable. This means that we ask 0 disk space for our instances so Flink 
can’t work.
I’d appreciate suggestions if you have any. Otherwise and since this is causing 
some problems on our side, I’d like to know if I can create a ticket on Flink 
and work on it; looks like the fix should be quite easy to implement.

Thank you,
Juan.


Re: 1.6 UI issues

2019-01-02 Thread Oleksandr Nitavskyi
Hello guys. Happy new year!

Context: we started to have some troubles with UI after bumping our Flink 
version from 1.4 to 1.6.3. UI couldn’t render Job details page, so inspecting 
of the jobs for us has become impossible with the new version.

And looks like we have a workaround for our UI issue.
After some investigation we realized that starting from Flink 1.5 version we 
started to have a timeout on the actor call: restfulGateway.requestJob(jobId, 
timeout) in ExecutionGraphCache. So we have increased web.timeout parameter and 
we have stopped to have timeout exception on the JobManager side.

Also in SingleJobController on the Angular JS side we needed to tweak 
web.refresh-interval in order to ensure that Front-End is waiting for back-end 
request to be finished. Otherwise Angular JS side can make another request in 
SingleJobController and don’t know why when older request is finished no UI has 
been changed. We will have a look closer on this behavior.

Does it ring a bell for you probably?

Thank you

Kind Regards
Oleksandr

From: Till Rohrmann 
Date: Wednesday 19 December 2018 at 16:52
To: Juan Gentile 
Cc: "dwysakow...@apache.org" , Jeff Bean 
, Oleksandr Nitavskyi 
Subject: Re: 1.6 UI issues

Hi Juan,

thanks for the log. The log file does not contain anything suspicious. Are you 
sure that you sent me the right file? The timestamps don't seem to match. In 
the attached log, the job seems to run without problems.

Cheers,
Till

On Wed, Dec 19, 2018 at 10:26 AM Juan Gentile 
mailto:j.gent...@criteo.com>> wrote:

Hello Till, Dawid
Sorry for the late response on this issue and thank you Jeff for helping us 
with this.
Yes we are using 1.6.2
I attach the logs from the Job Master.
Also we noticed this exception:
2018-12-19 08:50:10,497 ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Implementation 
error: Unhandled exception.
java.util.concurrent.CancellationException
at 
java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263)
at 
org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache.getExecutionGraph(ExecutionGraphCache.java:124)
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:76)
at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:78)
at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:154)
at 
org.apache.flink.runtime.rest.handler.RedirectHandler.lambda$null$0(RedirectHandler.java:142)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
2018-12-19 08:50:17,977 ERROR 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler  - Implementation 
error: Unhandled exception.
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#-760166654]] after [1 ms]. Sender[null] 
sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)

For which we tested with this parameter: -Dakka.ask.timeout=60s
But the issue remains.

Thank you
Juan

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Thursday, 8 November 2018 at 1

Flink 1.6, User Interface response time

2018-10-31 Thread Oleksandr Nitavskyi
Hello!

We are migrating the the last 1.6.2 version and all the jobs seem to work fine, 
but when we check individual jobs through the web interface we encounter the 
issue that after clicking on a job, either it takes too long to load the 
information of the job or it never loads at all.

Has anyone had this issue? I know that UI responsiveness is quite subjective, 
but is there anybody who notice also some degradation between Flink 1.4 and 1.6?

Thank you,
Juan



Re: Weird behaviour after change sources in a job.

2018-09-13 Thread Oleksandr Nitavskyi
Great !

So I have created a ticket: https://issues.apache.org/jira/browse/FLINK-10342 
with a test which reproduces the issue: 
https://github.com/apache/flink/pull/6691/files
If it seems reasonable I can create a fix for this.

Regards
Oleksandr

From: Dawid Wysakowicz 
Date: Thursday, 13 September 2018 at 11:15
To: Oleksandr Nitavskyi 
Cc: R/Product Engineering/PRIME/Delight , 
"gor...@data-artisans.com" , Juan Gentile 
, "user@flink.apache.org" 
Subject: Re: Weird behaviour after change sources in a job.


Hi Oleksandr,

The mapping of state to operator is done based on operator id, not on its name. 
That's why changing source's name might not be enough.

That actually might be a valuable addition to check if the restored partitions 
still match with the provided topic/topic pattern. Would you like to open jira 
ticket for it?

Best,

Dawid

On 13/09/18 11:06, Oleksandr Nitavskyi wrote:
Hello Dawid,

Thank you for the answer. In our case we did change the name of the Kafka 
source so we expected it shouldn’t restore state for a given Kafka source 
operator.

Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow 
restoring of the KafkaTopicPartitions from the topics which are different from 
the currently consumed one.

Thank you
Oleksandr

From: Dawid Wysakowicz <mailto:dwysakow...@apache.org>
Date: Thursday, 13 September 2018 at 09:59
To: Juan Gentile <mailto:j.gent...@criteo.com>, 
"user@flink.apache.org"<mailto:user@flink.apache.org> 
<mailto:user@flink.apache.org>
Cc: R/Product Engineering/PRIME/Delight 
<mailto:deli...@criteo.com>, 
<mailto:gor...@data-artisans.com>
Subject: Re: Weird behaviour after change sources in a job.


Hi Juan,

I think this is somehow expected behaviour. Flink, in order to provide proper 
processing semantics keeps track of partitions offsets internally, and 
checkpoints those offsets. FlinkKafkaConsumer supports

also new partitions discovery. Having in mind both of those features, if you 
restart your job with savepoint/checkpoint but with changed topic, it will 
restore old partitions with offsets from checkpoint, and will discover 
partitions

from the new topic. This is why it consumes from both old and new topic. If you 
defined your source manually (you were not using Kafka010TableSource) what you 
can do is set new uid for the source and enable allowNonRestoredState. This way 
you will keep state for all other operators, but you will lose

information about offsets in Kafka.



I also cc @Gordon, who might want to add something to this.

On 12/09/18 18:03, Juan Gentile wrote:
Hello!

We have found a weird issue while replacing the source in one of our Flink SQL 
Jobs.

We have a job which was reading from a Kafka topic (with externalize 
checkpoints) and we needed to change the topic while keeping the same logic for 
the job/SQL.
After we restarted the job, instead of consuming from the new Kafka topic, it 
consumed from both! Duplicating the input of our job.
We were able to reproduce the issue but we don’t understand if this is a bug or 
expected behavior and in this case we should have restarted from a clean state.
We are using Flink 1.4 at the moment and Kafka 0.10.2.1

Thank you,
Juan







Re: Weird behaviour after change sources in a job.

2018-09-13 Thread Oleksandr Nitavskyi
Hello Dawid,

Thank you for the answer. In our case we did change the name of the Kafka 
source so we expected it shouldn’t restore state for a given Kafka source 
operator.

Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow 
restoring of the KafkaTopicPartitions from the topics which are different from 
the currently consumed one.

Thank you
Oleksandr

From: Dawid Wysakowicz 
Date: Thursday, 13 September 2018 at 09:59
To: Juan Gentile , "user@flink.apache.org" 

Cc: R/Product Engineering/PRIME/Delight , 

Subject: Re: Weird behaviour after change sources in a job.


Hi Juan,

I think this is somehow expected behaviour. Flink, in order to provide proper 
processing semantics keeps track of partitions offsets internally, and 
checkpoints those offsets. FlinkKafkaConsumer supports

also new partitions discovery. Having in mind both of those features, if you 
restart your job with savepoint/checkpoint but with changed topic, it will 
restore old partitions with offsets from checkpoint, and will discover 
partitions

from the new topic. This is why it consumes from both old and new topic. If you 
defined your source manually (you were not using Kafka010TableSource) what you 
can do is set new uid for the source and enable allowNonRestoredState. This way 
you will keep state for all other operators, but you will lose

information about offsets in Kafka.



I also cc @Gordon, who might want to add something to this.

On 12/09/18 18:03, Juan Gentile wrote:
Hello!

We have found a weird issue while replacing the source in one of our Flink SQL 
Jobs.

We have a job which was reading from a Kafka topic (with externalize 
checkpoints) and we needed to change the topic while keeping the same logic for 
the job/SQL.
After we restarted the job, instead of consuming from the new Kafka topic, it 
consumed from both! Duplicating the input of our job.
We were able to reproduce the issue but we don’t understand if this is a bug or 
expected behavior and in this case we should have restarted from a clean state.
We are using Flink 1.4 at the moment and Kafka 0.10.2.1

Thank you,
Juan




Table API, custom window

2018-08-09 Thread Oleksandr Nitavskyi
Hello guys,

I am curious, is there a way to define custom window 
(assigners/trigger/evictor) for Table/Sql Flink API? Looks like documentation 
keep silence about this, but is there are plans for it? Or should we go with 
DataStream API in case we need such kind of functionality?

Thanks
Oleksandr Nitavskyi


CoreOptions.TMP_DIRS bug

2018-07-04 Thread Oleksandr Nitavskyi
Hello guys,
We have discovered minor issue with Flink 1.5 on YARN particularly which was 
related with the way Flink manages temp paths (io.tmp.dirs
) in configuration: 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#io-tmp-dirs

1.   From what we can see in the code, default option from documentation 
doesn’t correspond to the reality on YARN or on Mesos deployments. Looks like 
it equals to env variable ‘_FLINK_TMP_DIR’ on Mesos and to `LOCAL_DIRS` on Yarn.
2.   The issue on Yarn is that it is impossible to have different 
LOCAL_DIRS on JobManager and TaskManager, despite LOCAL_DIRS value depends on 
the container.
The issue is that CoreOptions.TMP_DIRS is configured to the default value 
during JobManager initialization and added to the configuration object. When 
TaskManager is launched the appropriate configuration object is cloned with 
LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
always equal either to path in flink.yml or to the or to the LOCAL_DIRS of Job 
Manager (default behaviour). Is TaskManager’s container do not have an access 
to another folders, that folders allocated by YARN TaskManager cannot be 
started.
Could you please confirm that it is a bug and I will create a Jira ticket to 
track it?
Thanks
Kind Regards
Oleksandr Nitavskyi



Flink long-running streaming job, Keytab authentication

2017-12-14 Thread Oleksandr Nitavskyi
Hello all,

I have a question about Kerberos authentication in Yarn environment for long 
running streaming job. According to the documentation ( 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/security-kerberos.html#yarnmesos-mode
 ) Flink’s solution is to use keytab in order to perform authentication in YARN 
perimeter.

If keytab is configured, Flink uses UserGroupInformation#loginUserFromKeytab 
method in order to perform authentication. In the YARN Security documentation (
https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#keytabs-for-am-and-containers-distributed-via-yarn
 ) mentioned that it should be enough:

Launched containers must themselves log in via 
UserGroupInformation.loginUserFromKeytab(). UGI handles the login, and 
schedules a background thread to relogin the user periodically.

But in reality if we check the Source code of UGI, we can see that no 
background Thread is created: 
https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java#L1153.
 There are just created javax.security.auth.login.LoginContext
and performed authentication. Looks like it is true for different Hadoop 
branches - 2.7, 2.8, 3.0, trunk. So Flink also doesn’t create any background 
Threads: 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L69.
 So in my case job loses credentials for ResourceManager and HDFS after some 
time (12 hours in my case).

Looks like UGI’s code is not aligned with the documentation and it doesn’t 
relogin periodically.
But do you think patching with background Thread which performs 
UGI#reloginUserFromKeytab can be a solution?

P.S. We are running Flink as a single job on Yarn.