Re: Which mode to choose flink on Yarn.

2018-09-19 Thread
That mainly depends on how much parallelism of your job.

The main bottleneck of job manager usually because it is busy to handle rpc
requests and gc. At most time you can set larger jm memory to address it by
pass `-jm 4096` to `yarn-session.sh start`.

Best,
tison.


weilongxing  于2018年9月20日周四 下午2:29写道:

> Thanks.
>
> I am wondering whether the job manager will be the bottleneck and how many
>  jobs could a job manager support in session mode. I tried to find the
> bottleneck in test environment but failed.
>
>
> 在 2018年9月20日,下午2:16,vino yang  写道:
>
> Hi weilong,
>
> As you said, there are advantages and disadvantages to each of the two
> approaches.
> However, I hope you know that the "single job" mode has a huge advantage
> over the "YARN flink session" mode in that it provides job-level isolation
> (whether JM or TM).
> This will allow the Job to be more fine-grained, and the refactoring of
> Flink's FLIP-6-based deployment model tends to be "single job" mode.
> But it will start more JM (appmaster) and take up more resources.
> But in the end, how to choose also requires you to evaluate and weigh.
>
> Thanks, vino.
>
> weilongxing  于2018年9月20日周四 上午10:27写道:
>
>> There are two methods to deploy flink applications on yarn. The first one
>> is use yarn-session and all flink applications are deployed in the session.
>> The second method is each flink application deploy on yarn as a yarn
>> application.
>>
>> My question is what's the difference between these two methods? Which one
>> to choose in product environment?
>>
>> I can't find any material about this.
>>
>> I think the first method will save resources since only need one
>> jobmanager(yarn application master).  While it is also the disadvantage
>> since the only jobmanager can be the bottleneck while flink applications
>> getting more and more.
>>
>
>


Re: Which mode to choose flink on Yarn.

2018-09-19 Thread
Hi weilong,

As vino said, the main advantage of per job mode is that it provides
job-level isolation, and that of session mode is that it set up a
persistent session and accept job, which means the overhead of resource
request/setup would loose. In addition, per job mode calculate resource
that the job required, while session mode require you config a static
config of that persistent session.

As an advice by experience, prefer per job mode for large jobs, and session
mode for a serious of small jobs.

Best,
tison.


vino yang  于2018年9月20日周四 下午2:17写道:

> Hi weilong,
>
> As you said, there are advantages and disadvantages to each of the two
> approaches.
> However, I hope you know that the "single job" mode has a huge advantage
> over the "YARN flink session" mode in that it provides job-level isolation
> (whether JM or TM).
> This will allow the Job to be more fine-grained, and the refactoring of
> Flink's FLIP-6-based deployment model tends to be "single job" mode.
> But it will start more JM (appmaster) and take up more resources.
> But in the end, how to choose also requires you to evaluate and weigh.
>
> Thanks, vino.
>
> weilongxing  于2018年9月20日周四 上午10:27写道:
>
>> There are two methods to deploy flink applications on yarn. The first one
>> is use yarn-session and all flink applications are deployed in the session.
>> The second method is each flink application deploy on yarn as a yarn
>> application.
>>
>> My question is what's the difference between these two methods? Which one
>> to choose in product environment?
>>
>> I can't find any material about this.
>>
>> I think the first method will save resources since only need one
>> jobmanager(yarn application master).  While it is also the disadvantage
>> since the only jobmanager can be the bottleneck while flink applications
>> getting more and more.
>>
>


Re: Question about akka configuration for FLIP-6

2018-09-09 Thread
Hi Gray,

Thanks for your useful information! Here I wonder if the following configs
still valid on FLIP-6 mode.

1. akka.transport.heartbeat.interval
2. akka.transport.heartbeat.pause

It seems they are different from HeartbeatServices and possibly still valid.

Best,
tison.


Gary Yao  于2018年9月10日周一 下午1:50写道:

> I should add that in FLIP-6 mode we are not relying on Akka's DeathWatch
> but
> because Flink's RPC framework uses Akka, you are still able to configure
> the
> other Akka config options [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#distributed-coordination-via-akka
>
> On Mon, Sep 10, 2018 at 7:38 AM, Gary Yao  wrote:
>
>> Hi Tony,
>>
>> You are right that with FLIP-6 Akka is abstracted away. If you want custom
>> heartbeat settings, you can configure the options below [1]:
>>
>> heatbeat.interval
>> heartbeat.timeout
>>
>> The config option taskmanager.exit-on-fatal-akka-error is also not
>> relevant
>> anymore. I closest I can think of is taskmanager.registration.timeout [2].
>>
>> Best,
>> Gary
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#heartbeat-manager
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#taskmanager-registration-timeout
>>
>> On Mon, Sep 10, 2018 at 4:24 AM, Tony Wei  wrote:
>>
>>> Hi,
>>>
>>> I'm going to migrate my flink cluster from 1.4.0 to 1.5.3, and I have
>>> been trying to map config file
>>> to the latest version. I used to use these three configuration. Are
>>> they still needed in FLIP-6 mode?
>>> Moreover, is any akka config still needed in FLIP-6 mode? Since I had a
>>> impression that FLIP-6
>>> tried to get rid of akka and use its own rpc interface. Please correct
>>> me if I misunderstood. Thanks.
>>>
>>> akka.watch.heartbeat.interval
>>> akka.watch.heartbeat.pause
>>> taskmanager.exit-on-fatal-akka-error
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>
>>
>


Re: How to customize schedule mode and result partition type?

2018-09-07 Thread
Sorry to attach this message here, but if someone see this email please
reply an ack. Without any reply I wonder if this email has been received by
someone in the mail list.

Best,
tison.


How to customize schedule mode and result partition type?

2018-09-06 Thread
Hi all,

Here I prefer to forcing a task running in LAZY_FROM_SOURCE schedule mode
with all ResultPartitionType be BLOCKING.

But I cannot find options to config that in StreamExecutionEnvironment,
thus using below as a workaround, quite triky.

inal StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new InfiniteSourceFunction())
   .setParallelism(2)
   .shuffle()
   .addSink(new DiscardingSink<>())
   .setParallelism(2);

Field field = 
env.getClass().getSuperclass().getDeclaredField("transformations");
field.setAccessible(true);
List> transformations =
(List>) field.get(env);

StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
streamGraph.getCustomConfiguration().setString(ScheduleMode.class.getName(),
LAZY_FROM_SOURCES.toString());
streamGraph.setJobName(testname);
streamGraph.getStreamEdges(1, 3)
   .get(0).setResultPartitionType(ResultPartitionType.BLOCKING);


Best,
tison.


Re: Setting Flink Monitoring API Port on YARN Cluster

2018-09-06 Thread
Hi Austin,

`rest.port` is the latest config option to configure "The port that the
server listens on / the client connects to.", with deprecated key
`web.port` which is with deprecated key `jobmanager.web.port`, so it is
enough to config `rest.port` only (at least for 1.6). However, in your case
the configuration should have worked.

Since Flink recognizes configuration from both flink-conf.yaml and
command-line, it would be helpful if you show us how you do the setting.

Best,
tison.


Austin Cawley-Edwards  于2018年9月7日周五 上午6:33写道:

> Hi everyone,
>
> I'm running a YARN session on a cluster with one master and one core and
> would like to use the Monitoring API programmatically to submit jobs. I
> have found that the configuration variables are read but ignored when
> starting the session - it seems to choose a random port each run.
>
> Here's a snippet from the startup logs:
>
> 2018-09-06 21:44:38,763 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
> 2018-09-06 21:44:38,764 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
> 2018-09-06 21:44:38,765 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: rest.port, 44477
> 2018-09-06 21:44:38,765 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.web.port, 44477
> 2018-09-06 21:44:38,765 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability.jobmanager.port, 44477
> 2018-09-06 21:44:38,775 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn
> properties file under /tmp/.yarn-properties-hadoop.
> 2018-09-06 21:44:39,615 WARN  org.apache.hadoop.util.NativeCodeLoader
>  - Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
> 2018-09-06 21:44:39,799 INFO
> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop user
> set to hadoop (auth:SIMPLE)
> 2018-09-06 21:44:40,045 INFO  org.apache.hadoop.yarn.client.RMProxy
>  - Connecting to ResourceManager at
> ip-10-2-3-71.ec2.internal/10.2.3.71:8032
> 2018-09-06 21:44:40,312 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster
> specification: ClusterSpecification{masterMemoryMB=1024,
> taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-09-06 21:44:43,564 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting
> application master application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1536250520330_0007
> 2018-09-06 21:44:43,802 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for
> the cluster to be allocated
> 2018-09-06 21:44:43,804 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying
> cluster, current state ACCEPTED
> 2018-09-06 21:44:48,326 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN
> application has been deployed successfully.
> 2018-09-06 21:44:48,326 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink
> YARN client has been started in detached mode. In order to stop Flink on
> YARN, use the following command or a YARN web interface to stop it:
> yarn application -kill application_1536250520330_0007
> Please also note that the temporary files of the YARN session in the home
> directory will not be removed.
> 2018-09-06 21:44:48,821 INFO  org.apache.flink.runtime.rest.RestClient
>   - Rest client endpoint started.
> Flink JobManager is now running on ip-10-2-3-25.ec2.internal:38683 with
> leader id ----.
> JobManager Web Interface: http://ip-10-2-3-25.ec2.internal:38683
>
>
> I'm setting both the rest.port and jobmanager.web.port, but both are
> ignored. Has anyone seen this before?
>
> Thanks!
>


How to simulate a blocking (sink) node?

2018-09-05 Thread
Hi all,

Recently I want to write a test in a batch case that some of tasks are
FINISHED.

I try to write a finite SourceFunction and a (expected) blocking
SinkFunction. But the job FINISHED. This surprises me. Why could the Sink
FINISHED in such a case?

The job and log are attached.

Best,
tison.


WeirdJob.java
Description: Binary data


jobmanager.log
Description: Binary data


Does Flink plan to support JDK 9 recently?

2018-08-31 Thread
Hi,

Recently I see a PR  mentions
"for jdk9 compatibility", and I wonder if Flink considered to support JDK9
recently? If so, what is the plan?

Best,
tison.


Re: Why is flink master bump version to 1.7?

2018-07-17 Thread
Yes I can see it now. Thank you all!

Till Rohrmann  于2018年7月17日周二 下午7:53写道:

> Yes, pulling from https://git-wip-us.apache.org/repos/asf/flink.git
> should show you the release-1.6 branch.
>
> Cheers,
> Till
>
> On Tue, Jul 17, 2018 at 10:37 AM Chesnay Schepler 
> wrote:
>
>> The release-1.6 branch exists (
>> https://git-wip-us.apache.org/repos/asf?p=flink.git;a=shortlog;h=refs/heads/release-1.6
>> ),
>> but wasn't synced to GitHub yet.
>>
>> On 17.07.2018 09:33, Timo Walther wrote:
>>
>> Hi Tison,
>>
>> I guess this was a mistake that will be fixed soon. Till (in CC) forked
>> off the release-1.6 branch yesterday?
>>
>> Regards,
>> Timo
>>
>> Am 17.07.18 um 04:00 schrieb 陈梓立:
>>
>> Hi,
>>
>> I see no 1.6 branch or tag. What's the reason we skip 1.6 and now
>> 1.7-SNAPSHOT? or there is a 1.6 I miss.
>>
>> Best,
>> tison
>>
>>
>>
>>


Why is flink master bump version to 1.7?

2018-07-16 Thread
Hi,

I see no 1.6 branch or tag. What's the reason we skip 1.6 and now
1.7-SNAPSHOT? or there is a 1.6 I miss.

Best,
tison


Why FoldFunction deprecated?

2018-05-04 Thread
I just write a code snip like

```
.fold(new Tuple2<>("", 0L), new FoldFunction>() {
@Override
public Tuple2 fold(Tuple2
acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
```

and replace it using `aggregate()`

```
.aggregate(new AggregateFunction,
Tuple2>() {
@Override
public Tuple2 createAccumulator() {
return new Tuple2<>("", 0L);
}

@Override
public Tuple2 add(WikipediaEditEvent
event, Tuple2 acc) {
return new Tuple2<>(event.getUser(), acc.f1 +
event.getByteDiff());
}

@Override
public Tuple2 getResult(Tuple2 acc) {
return acc;
}

@Override
public Tuple2 merge(Tuple2
a, Tuple2 b) {
return new Tuple2<>(a.f0, a.f1 + b.f1);
}
});
```

It seems I have to write much more code using `aggregate()`

Is there something I miss so that write so much code? Or say, maybe
`aggregate()` is expressive, but why `fold()` deprecated? Since `fold` is a
general concept people can understand.


A trivial update on README

2018-04-27 Thread
Hi guys,

Recently I push a PR on apache/flink repo(https://github.com/
apache/flink/pull/5924), it's about a trivial update on README.md, raising
once I surprisingly failed to build using Java 9.

It is good that someone just tell me that it is meaningless so that I could
close it, but no replies, which makes me feel so sad.

This is my first PR on Flink, and I think it might be a good start to me
that a PR is accepted so that I gain more passion on Flink.

Look forward to your reply.

Alex.