Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Yangze Guo
Thanks for the reply, Andrey.

Regarding building from local dist:
- Yes, I bring this up mostly for development purpose. Since k8s is
popular, I believe more and more developers would like to test their
work on k8s cluster. I'm not sure should all developers write a custom
docker file themselves in this scenario. Thus, I still prefer to
provide a script for devs.
- I agree to keep the scope of this FLIP mostly for those normal
users. But as far as I can see, supporting building from local dist
would not take much extra effort.
- The maven docker plugin sounds good. I'll take a look at it.

Regarding supporting JAVA 11:
- Not sure if it is necessary to ship JAVA. Maybe we could just change
the base image from openjdk:8-jre to openjdk:11-jre in template docker
file[1]. Correct me if I understand incorrectly. Also, I agree to move
this out of the scope of this FLIP if it indeed takes much extra
effort.

Regarding the custom configuration, the mechanism that Thomas mentioned LGTM.

[1] 
https://github.com/apache/flink-docker/blob/master/Dockerfile-debian.template

Best,
Yangze Guo

On Wed, Mar 11, 2020 at 5:52 AM Thomas Weise  wrote:
>
> Thanks for working on improvements to the Flink Docker container images. This 
> will be important as more and more users are looking to adopt Kubernetes and 
> other deployment tooling that relies on Docker images.
>
> A generic, dynamic configuration mechanism based on environment variables is 
> essential and it is already supported via envsubst and an environment 
> variable that can supply a configuration fragment:
>
> https://github.com/apache/flink-docker/blob/09adf2dcd99abfb6180e1e2b5b917b288e0c01f6/docker-entrypoint.sh#L88
> https://github.com/apache/flink-docker/blob/09adf2dcd99abfb6180e1e2b5b917b288e0c01f6/docker-entrypoint.sh#L85
>
> This gives the necessary control for infrastructure use cases that aim to 
> supply deployment tooling other users. An example in this category this is 
> the FlinkK8sOperator:
>
> https://github.com/lyft/flinkk8soperator/tree/master/examples/wordcount
>
> On the flip side, attempting to support a fixed subset of configuration 
> options is brittle and will probably lead to compatibility issues down the 
> road:
>
> https://github.com/apache/flink-docker/blob/09adf2dcd99abfb6180e1e2b5b917b288e0c01f6/docker-entrypoint.sh#L97
>
> Besides the configuration, it may be worthwhile to see in which other ways 
> the base Docker images can provide more flexibility to incentivize wider 
> adoption.
>
> I would second that it is desirable to support Java 11 and in general use a 
> base image that allows the (straightforward) use of more recent versions of 
> other software (Python etc.)
>
> https://github.com/apache/flink-docker/blob/d3416e720377e9b4c07a2d0f4591965264ac74c5/Dockerfile-debian.template#L19
>
> Thanks,
> Thomas
>
> On Tue, Mar 10, 2020 at 12:26 PM Andrey Zagrebin  wrote:
>>
>> Hi All,
>>
>> Thanks a lot for the feedback!
>>
>> *@Yangze Guo*
>>
>> - Regarding the flink_docker_utils#install_flink function, I think it
>> > should also support build from local dist and build from a
>> > user-defined archive.
>>
>> I suppose you bring this up mostly for development purpose or powerful
>> users.
>> Most of normal users are usually interested in mainstream released versions
>> of Flink.
>> Although, you are bring a valid concern, my idea was to keep scope of this
>> FLIP mostly for those normal users.
>> The powerful users are usually capable to design a completely
>> custom Dockerfile themselves.
>> At the moment, we already have custom Dockerfiles e.g. for tests in
>> flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile.
>> We can add something similar for development purposes and maybe introduce a
>> special maven goal. There is a maven docker plugin, afaik.
>> I will add this to FLIP as next step.
>>
>> - It seems that the install_shaded_hadoop could be an option of
>> > install_flink
>>
>> I woud rather think about this as a separate independent optional step.
>>
>> - Should we support JAVA 11? Currently, most of the docker file based on
>> > JAVA 8.
>>
>> Indeed, it is a valid concern. Java version is a fundamental property of
>> the docker image.
>> To customise this in the current mainstream image is difficult, this would
>> require to ship it w/o Java at all.
>> Or this is a separate discussion whether we want to distribute docker hub
>> images with different Java versions or just bump it to Java 11.
>> This should be easy in a custom Dockerfile for development purposes though
>> as mentioned before.
>>
>> - I do not understand how to set config options through
>>
>> "flink_docker_utils configure"? Does this step happen during the image
>> > build or the container start? If it happens during the image build,
>> > there would be a new image every time we change the config. If it just
>> > a part of the container entrypoint, I think there is no need to add a
>> > configure command, we could just add all 

Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
我在社区建了个issue:https://issues.apache.org/jira/browse/FLINK-16534
后续你可以关注下

Best,
Kurt


On Wed, Mar 11, 2020 at 12:54 PM Kurt Young  wrote:

> sql client 目前还不支持这个功能。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Hi Kurt,
>> 确实是可以 直接 flink  cancel -s 保存状态。
>> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>>
>> 谢谢,
>> 王磊
>>
>>
>> *Sender:* Kurt Young 
>> *Send Time:* 2020-03-11 10:38
>> *Receiver:* user-zh 
>> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
>> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>> > 有两个表:
>> > tableA: key  valueA
>> > tableB: key  valueB
>> >
>> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
>> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
>> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>> >
>> > 谢谢,
>> > 王磊
>> >
>>
>>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Kurt,
> 确实是可以 直接 flink  cancel -s 保存状态。
> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>
> 谢谢,
> 王磊
>
>
> *Sender:* Kurt Young 
> *Send Time:* 2020-03-11 10:38
> *Receiver:* user-zh 
> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> > 有两个表:
> > tableA: key  valueA
> > tableB: key  valueB
> >
> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> >
> > 谢谢,
> > 王磊
> >
>
>


Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Xintong Song
Hi Eleanore,

That does't sound like a scaling issue. It's probably a data skew, that the
data volume on some of the keys are significantly higher than others. I'm
not familiar with this area though, and have copied Jark for you, who is
one of the community experts in this area.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin 
wrote:

> _Hi Xintong,
>
> Thanks for the prompt reply! To answer your question:
>
>- Which Flink version are you using?
>
>v1.8.2
>
>- Is this skew observed only after a scaling-up? What happens if the
>parallelism is initially set to the scaled-up value?
>
>I also tried this, it seems skew also happens even I do
> not change the parallelism, so it may not caused by scale-up/down
>
>- Keeping the job running a while after the scale-up, does the skew
>ease?
>
>So the skew happens in such a way that: some partitions
> lags down to 0, but other partitions are still at level of 10_000, and I am
> seeing the back pressure is ok.
>
> Thanks a lot!
> Eleanore
>
>
> On Tue, Mar 10, 2020 at 7:03 PM Xintong Song 
> wrote:
>
>> Hi Eleanore,
>>
>> I have a few more questions regarding your issue.
>>
>>- Which Flink version are you using?
>>- Is this skew observed only after a scaling-up? What happens if the
>>parallelism is initially set to the scaled-up value?
>>- Keeping the job running a while after the scale-up, does the skew
>>ease?
>>
>> I suspect the performance difference might be an outcome of some warming
>> up issues. E.g., the existing TMs might have some file already localized,
>> or some memory buffers already promoted to the JVM tenured area, while the
>> new TMs have not.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
>> wrote:
>>
>>> Hi Experts,
>>> I have my flink application running on Kubernetes, initially with 1 Job
>>> Manager, and 2 Task Managers.
>>>
>>> Then we have the custom operator that watches for the CRD, when the CRD
>>> replicas changed, it will patch the Flink Job Manager deployment
>>> parallelism and max parallelism according to the replicas from CRD
>>> (parallelism can be configured via env variables for our application).
>>> which causes the job manager restart. hence a new Flink job. But the
>>> consumer group does not change, so it will continue from the offset
>>> where it left.
>>>
>>> In addition, operator will also update Task Manager's deployment
>>> replicas, and will adjust the pod number.
>>>
>>> In case of scale up, the existing task manager pods do not get killed,
>>> but new task manager pods will be created.
>>>
>>> And we observed a skew in the partition offset consumed. e.g. some
>>> partitions have huge lags and other partitions have small lags. (observed
>>> from burrow)
>>>
>>> This is also validated by the metrics from Flink UI, showing the
>>> throughput differs for slotss
>>>
>>> Any clue why this is the case?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>


Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Xintong Song
Hi Eleanore,

That does't sound like a scaling issue. It's probably a data skew, that the
data volume on some of the keys are significantly higher than others. I'm
not familiar with this area though, and have copied Jark for you, who is
one of the community experts in this area.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin 
wrote:

> _Hi Xintong,
>
> Thanks for the prompt reply! To answer your question:
>
>- Which Flink version are you using?
>
>v1.8.2
>
>- Is this skew observed only after a scaling-up? What happens if the
>parallelism is initially set to the scaled-up value?
>
>I also tried this, it seems skew also happens even I do
> not change the parallelism, so it may not caused by scale-up/down
>
>- Keeping the job running a while after the scale-up, does the skew
>ease?
>
>So the skew happens in such a way that: some partitions
> lags down to 0, but other partitions are still at level of 10_000, and I am
> seeing the back pressure is ok.
>
> Thanks a lot!
> Eleanore
>
>
> On Tue, Mar 10, 2020 at 7:03 PM Xintong Song 
> wrote:
>
>> Hi Eleanore,
>>
>> I have a few more questions regarding your issue.
>>
>>- Which Flink version are you using?
>>- Is this skew observed only after a scaling-up? What happens if the
>>parallelism is initially set to the scaled-up value?
>>- Keeping the job running a while after the scale-up, does the skew
>>ease?
>>
>> I suspect the performance difference might be an outcome of some warming
>> up issues. E.g., the existing TMs might have some file already localized,
>> or some memory buffers already promoted to the JVM tenured area, while the
>> new TMs have not.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
>> wrote:
>>
>>> Hi Experts,
>>> I have my flink application running on Kubernetes, initially with 1 Job
>>> Manager, and 2 Task Managers.
>>>
>>> Then we have the custom operator that watches for the CRD, when the CRD
>>> replicas changed, it will patch the Flink Job Manager deployment
>>> parallelism and max parallelism according to the replicas from CRD
>>> (parallelism can be configured via env variables for our application).
>>> which causes the job manager restart. hence a new Flink job. But the
>>> consumer group does not change, so it will continue from the offset
>>> where it left.
>>>
>>> In addition, operator will also update Task Manager's deployment
>>> replicas, and will adjust the pod number.
>>>
>>> In case of scale up, the existing task manager pods do not get killed,
>>> but new task manager pods will be created.
>>>
>>> And we observed a skew in the partition offset consumed. e.g. some
>>> partitions have huge lags and other partitions have small lags. (observed
>>> from burrow)
>>>
>>> This is also validated by the metrics from Flink UI, showing the
>>> throughput differs for slotss
>>>
>>> Any clue why this is the case?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


Re: Question on the SQL "GROUPING SETS" and "CUBE" syntax usability

2020-03-10 Thread Jark Wu
Thanks Arvid for reminding me this topic.

Actually, it is supported in streaming mode in blink planner (since Flink
v1.9), but we missed to update the documentation.
You can also find it is supported in the integration tests [1].

I created an issue to update docs [2].

Best,
Jark

[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateRemoveITCase.scala#L68
[2]: https://issues.apache.org/jira/browse/FLINK-16530

On Tue, 10 Mar 2020 at 16:11, Arvid Heise  wrote:

> Hi Weike,
>
> according to the linked documentation, the operations are ready but as you
> have mentioned only for SQL batch mode, which is not surprising as they
> don't have a well-behaved semantics on streams. See also Calcites
> explanations [1].
>
> Could you maybe outline your use case and what you'd expect these
> operations to be? Would you like to combine them with windows?
>
> I'm CCing Jark, as he knows SQL much better than me.
>
> [1] https://calcite.apache.org/docs/stream.html#grouping-sets
>
> On Mon, Mar 9, 2020 at 8:27 AM DONG, Weike 
> wrote:
>
>> Hi,
>>
>> From the Flink 1.10  official document (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html),
>> we could see that GROUPING SETS is only supported in Batch mode.
>>
>> [image: image.png]
>>
>> However, we also found that in
>> https://issues.apache.org/jira/browse/FLINK-12192, stream query using
>> GROUPING SETS and CUBE is already a fixed issues in 1.9.
>>
>> Here I would like to know if SQL support for GROUPING SETS, ROLLUP, CUBE
>> is ready for use or not, and whether the document needs to be updated or
>> not.
>>
>> Thank you
>>
>> Best regards,
>> Weike
>>
>


Re: Re: Kafka sink only support append mode?

2020-03-10 Thread Jark Wu
Hi Lei,

Are you trying a regular left join query?

Non-time-based operators (e.g. regular join in your case) will emit result
when input is not complete,
the result will be updated when more inputs come in (by emitting
upsert/retract messages).
But time-based operators (e.g. windowed aggregate, interval join) can emit
final result because it will wait input to be complete (by watermark),
thus it can produce append-only stream.

Maybe you can try to update your query to interval join [1] (called
time-windowed join in docs currently) to get an append result.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins


On Tue, 10 Mar 2020 at 18:41, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Jark,
> Thanks  for the  explanation.
> The group by statement will result a not append stream.
> I have just tried a join statement and want to send the result to kafka,
> it also has the error:
>  AppendStreamTableSink requires that Table has only insert changes
> Why the join result is not appendable. It confused me.
>
> Thanks,
> Lei
>
>
> *From:* Jark Wu 
> *Date:* 2020-03-09 19:25
> *To:* wangl...@geekplus.com.cn
> *CC:* user 
> *Subject:* Re: Kafka sink only support append mode?
> Hi Lei,
>
> Yes. Currently, Kafka sink only supports append mode. Other update mode
> (e.g. upsert mode / retract mode) is on the agenda.
> For now, you can customize a KafkaTableSink with implementing
> UpsertStreamTableSink interface, where you will get a Tuple2
> records,
> and the Boolean represents insert or delete operation. Then you can encode
> the insert/delete operation into Kafka storage or just ignore the
> operations.
>
> Best,
> Jark
>
> On Mon, 9 Mar 2020 at 19:14, wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> I  wrote a simple program reading from kafka using  sql  and sink to
>> kafka.
>> But only  'update-mode' = 'append' is supported for sink table and the
>> query sql must have no group statement.
>> Only append mode is supported for kafka sink?
>>
>> Thanks,
>> Lei
>>
>>
>>


Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread Jark Wu
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since
1.9. And the yaml way might be deprecated in the future.
By using DDL, a registered table can both be used as source and sink.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Thanks, works now.
>
> Seems it is because i added the
>schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
> STRING, status INT)"
>
> under format label.
>
> *From:* Arvid Heise 
> *Date:* 2020-03-10 20:51
> *To:* wangl...@geekplus.com.cn
> *CC:* user 
> *Subject:* Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> yes Kafka as a sink is supported albeit only for appends (no
> deletions/updates yet) [1].
>
> An example is a bit hidden in the documentation [2]:
>
> tables:
>   - name: MyTableSink
> type: sink-table
> update-mode: append
> connector:
>   property-version: 1
>   type: kafka
>   version: "0.11"
>   topic: OutputTopic
>   properties:
> zookeeper.connect: localhost:2181
> bootstrap.servers: localhost:9092
> group.id: testGroup
> format:
>   property-version: 1
>   type: json
>   derive-schema: true
> schema:
>   - name: rideId
> data-type: BIGINT
>   - name: lon
> data-type: FLOAT
>   - name: lat
> data-type: FLOAT
>   - name: rideTime
> data-type: TIMESTAMP(3)
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
>
> On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> I have configured  source table successfully using the following
>> configuration:
>>
>> - name: out_order
>> type: source
>> update-mode: append
>> schema:
>> - name: out_order_code
>>   type: STRING
>> - name: input_date
>>   type: BIGINT
>> - name: owner_code
>>   type: STRING
>> connector:
>>   property-version: 1
>>   type: kafka
>>   version: universal
>>   topic: out_order
>>   startup-mode: latest-offset
>>   properties:
>>   - key: zookeeper.connect
>> value: 172.19.78.32:2181
>>   - key: bootstrap.servers
>> value: 172.19.78.32:9092
>>   - key: group.id
>>   property-version: 1
>>   type: json
>>   schema: "ROW(out_order_code STRING,owner_code STRING,input_date
>> BIGINT)"
>>
>> How can i configure a sink table? I haven't found any useful docs for
>> this.
>>
>> Thanks,
>> Lei
>>
>


Re: flink HiveTableSink 何时支持 streaming 模式写入

2020-03-10 Thread Kurt Young
预计1.11会ready。

Best,
Kurt


On Wed, Mar 11, 2020 at 10:44 AM chenkaibit  wrote:

> Hi:
> 我看 https://issues.apache.org/jira/browse/FLINK-14255 引入了 一个
> FileSystemStreamingSink,貌似是为 HiveTableSink 支持 streaming
> 模式写入做准备,这个功能预计会在后续哪个版本正式发布呢?
>
>


Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-10 Thread Kurt Young
Hi Jiawai,

Sorry I still didn't fully get your question. What's wrong with your
proposed SQL?

> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId

My guess is that such query would only trigger calculations by new event.
So if a very old
inventory like inbounded 17 days ago, and there is no new events coming
about that inventory,
then the calculation would not be triggered and you can't sum it, right?

Best,
Kurt


On Wed, Mar 11, 2020 at 10:06 AM Jiawei Wu 
wrote:

> Hi Robert,
>
> Your answer really helps.
>
> About the problem, we have 2 choices. The first one is using Flink as
> described in this email thread. The second one is using AWS Lambda
> triggered by CDC stream and compute the latest 15 days record, which is a
> walk-around solution and looks not as elegant as Flink to me.
>
> Currently we decided to choose AWS Lambda because we are familiar with it,
> and the most important, it lead to nearly no operational burden. But we are
> actively looking for the comparison between Lambda and Flink and want to
> know in which situation we prefer Flink over Lambda. Several teams in our
> company are already in a hot debate about the comparison, and the biggest
> concern is the non-function requirements about Flink, such as fault
> tolerance, recovery, etc.
>
> I also searched the internet but found there are nearly no comparisons
> between Lambda and Flink except for their market share :-( I'm wondering
> what do you think of this? Or any comments from flink community is
> appreciated.
>
> Thanks,
> J
>
>
> On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger  wrote:
>
>> Hey Jiawei,
>>
>> I'm sorry that you haven't received an answer yet.
>>
>> So you basically have a stream of dynamodb table updates (let's call id
>> CDC stream), and you would like to maintain the inventory of the last 15
>> days for each vendor.
>> Whenever there's an update in the inventory data (a new event arrives in
>> the CDC stream), you want to produce a new event with the inventory count.
>>
>> If I'm not mistaken, you will need to keep all the inventory in Flink's
>> state to have an accurate count and to drop old records when they are
>> expired.
>> There are two options for maintaining the state:
>> - in memory (using the FsStateBackend)
>> - on disk (using the embedded RocksDBStatebackend)
>>
>> I would recommend starting with the RocksDBStateBackend. It will work as
>> long as your state fits on all your machines hard disks (we'll probably not
>> have an issue there :) )
>> If you run into performance issues, you can consider switching to a
>> memory based backend (by then, you should have some knowledge about your
>> state size)
>>
>> For tracking the events, I would recommend you to look into Flink's
>> windowing API:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>  / https://flink.apache.org/news/2015/12/04/Introducing-windows.html
>> Or alternatively doing an implementation with ProcessFunction:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
>> I personally would give it a try with ProcessFunction first.
>>
>> For reading the data from DynamoDB, there's an undocumented feature for
>> it in Flink. This is an example for reading from a DynamoDB stream:
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
>> Here's also some info: https://issues.apache.org/jira/browse/FLINK-4582
>>
>> For writing to DynamoDB there is currently no official sink in Flink. It
>> should be fairly straightforward to implement a Sink using the SinkFunction
>> interface of Flink.
>>
>> I hope this answers your question.
>>
>> Best,
>> Robert
>>
>>
>>
>>
>> On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu 
>> wrote:
>>
>>> Hi flink users,
>>>
>>> We have a problem and think flink may be a good solution for that. But
>>> I'm new to flink and hope can get some insights from flink community :)
>>>
>>> Here is the problem. Suppose we have a DynamoDB table which store the
>>> inventory data, the schema is like:
>>>
>>> * vendorId (primary key)
>>> * inventory name
>>> * inventory units
>>> * inbound time
>>> ...
>>>
>>> This DDB table keeps changing, since we have inventory coming and
>>> removal. *Every change will trigger a DynamoDB stream. *
>>> We need to calculate *all the inventory units that > 15 days for a
>>> specific vendor* like this:
>>> > select vendorId, sum(inventory units)
>>> > from dynamodb
>>> > where today's time - inbound time > 15
>>> > group by vendorId
>>> We don't want to schedule a daily batch job, so we are trying to work on
>>> a micro-batch solution in Flink, and publish this data to another DynamoDB
>>> table.
>>>
>>> A draft idea is to use the total units minus <15 days units, since both
>>> of then have event trigger. But no 

Re: Setting app Flink logger

2020-03-10 Thread Yang Wang
Since you are using log4j2, the java dynamic property should not be
"log4j.configuration". Please use "log4j.configurationFile" instead.

Maybe it is not your problem, there is something wrong with the docker
image. The log4j2 properties in "flink-console.sh" are not configured
correctly.


Best,
Yang

miki haiat  于2020年3月10日周二 下午11:50写道:

> Which image are you using ?
>
> On Tue, Mar 10, 2020, 16:27 Eyal Pe'er  wrote:
>
>> Hi Rafi,
>>
>> The file exists (and is the file from the official imageJ, please see
>> below).
>>
>> The user is root and it has permission. I am running in HA mode using
>> docker.
>>
>>
>>
>> cat /opt/flink/conf/log4j-console.properties
>>
>>
>>
>>
>> 
>>
>> #  Licensed to the Apache Software Foundation (ASF) under one
>>
>> #  or more contributor license agreements.  See the NOTICE file
>>
>> #  distributed with this work for additional information
>>
>> #  regarding copyright ownership.  The ASF licenses this file
>>
>> #  to you under the Apache License, Version 2.0 (the
>>
>> #  "License"); you may not use this file except in compliance
>>
>> #  with the License.  You may obtain a copy of the License at
>>
>> #
>>
>> #  http://www.apache.org/licenses/LICENSE-2.0
>>
>> #
>>
>> #  Unless required by applicable law or agreed to in writing, software
>>
>> #  distributed under the License is distributed on an "AS IS" BASIS,
>>
>> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>
>> #  See the License for the specific language governing permissions and
>>
>> # limitations under the License.
>>
>>
>> 
>>
>>
>>
>> # This affects logging for both user code and Flink
>>
>> rootLogger.level = INFO
>>
>> rootLogger.appenderRef.console.ref = ConsoleAppender
>>
>>
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>>
>> #log4j.logger.org.apache.flink=INFO
>>
>>
>>
>> # The following lines keep the log level of common libraries/connectors on
>>
>> # log level INFO. The root logger does not override this. You have to
>> manually
>>
>> # change the log levels here.
>>
>> logger.akka.name = akka
>>
>> logger.akka.level = INFO
>>
>> logger.kafka.name= org.apache.kafka
>>
>> logger.kafka.level = INFO
>>
>> logger.hadoop.name = org.apache.hadoop
>>
>> logger.hadoop.level = INFO
>>
>> logger.zookeeper.name = org.apache.zookeeper
>>
>> logger.zookeeper.level = INFO
>>
>>
>>
>> # Log all infos to the console
>>
>> appender.console.name = ConsoleAppender
>>
>> appender.console.type = CONSOLE
>>
>> appender.console.layout.type = PatternLayout
>>
>> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c
>> %x - %m%n
>>
>>
>>
>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>>
>> logger.netty.name =
>> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
>>
>> logger.netty.level = OFF
>>
>>
>>
>> Best regards
>>
>> Eyal Peer */ *Data Platform Developer
>>
>> [image: cid:image003.png@01D32C73.C785C410]
>>
>>
>>
>> *From:* Rafi Aroch 
>> *Sent:* Tuesday, March 10, 2020 3:55 PM
>> *To:* Eyal Pe'er 
>> *Cc:* user ; StartApp R Data Platform <
>> startapprnd...@startapp.com>
>> *Subject:* Re: Setting app Flink logger
>>
>>
>>
>> Hi Eyal,
>>
>>
>>
>> Sounds trivial, but can you verify that the file actually exists in
>> /opt/flink/conf/log4j-console.properties? Also, verify that the user
>> running the process has read permissions to that file.
>>
>> You said you use Flink in YARN mode, but the the example above you run
>> inside a docker image so this is a bit confusing. Notice that the official
>> docker images run as "flink" user and group ids.
>>
>>
>>
>> If you wish to try to use Logback instead, you can place you logback.xml
>> file as part of your project resources folder to include it in the
>> classpath. That should automatically get detected on startup.
>>
>>
>>
>> Hope this helps,
>>
>> Rafi
>>
>>
>>
>>
>>
>> On Tue, Mar 10, 2020 at 1:42 PM Eyal Pe'er 
>> wrote:
>>
>> Hi,
>>
>> I am running Flink in YARN mode using the official image with few
>> additional files.
>>
>> I’ve noticed that my logger failed to initialize:
>>
>>
>>
>> root:~# docker logs flink-task-manager
>>
>> Starting taskexecutor as a console application on host ***.
>>
>> log4j:WARN No appenders could be found for logger
>> (org.apache.flink.runtime.taskexecutor.TaskManagerRunner).
>>
>> log4j:WARN Please initialize the log4j system properly.
>>
>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
>> more info.
>>
>>
>>
>> I followed the documentation
>> 
>> and seems like all related configuration files exist.
>>
>> Currently, I am using the default files from the official image
>> 

flink HiveTableSink 何时支持 streaming 模式写入

2020-03-10 Thread chenkaibit
Hi:
我看 https://issues.apache.org/jira/browse/FLINK-14255 引入了 一个 
FileSystemStreamingSink,貌似是为 HiveTableSink 支持 streaming 
模式写入做准备,这个功能预计会在后续哪个版本正式发布呢?



Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。

Best,
Kurt


On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
_Hi Xintong,

Thanks for the prompt reply! To answer your question:

   - Which Flink version are you using?

   v1.8.2

   - Is this skew observed only after a scaling-up? What happens if the
   parallelism is initially set to the scaled-up value?

   I also tried this, it seems skew also happens even I do not
change the parallelism, so it may not caused by scale-up/down

   - Keeping the job running a while after the scale-up, does the skew ease?

   So the skew happens in such a way that: some partitions lags
down to 0, but other partitions are still at level of 10_000, and I am
seeing the back pressure is ok.

Thanks a lot!
Eleanore


On Tue, Mar 10, 2020 at 7:03 PM Xintong Song  wrote:

> Hi Eleanore,
>
> I have a few more questions regarding your issue.
>
>- Which Flink version are you using?
>- Is this skew observed only after a scaling-up? What happens if the
>parallelism is initially set to the scaled-up value?
>- Keeping the job running a while after the scale-up, does the skew
>ease?
>
> I suspect the performance difference might be an outcome of some warming
> up issues. E.g., the existing TMs might have some file already localized,
> or some memory buffers already promoted to the JVM tenured area, while the
> new TMs have not.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
> wrote:
>
>> Hi Experts,
>> I have my flink application running on Kubernetes, initially with 1 Job
>> Manager, and 2 Task Managers.
>>
>> Then we have the custom operator that watches for the CRD, when the CRD
>> replicas changed, it will patch the Flink Job Manager deployment
>> parallelism and max parallelism according to the replicas from CRD
>> (parallelism can be configured via env variables for our application).
>> which causes the job manager restart. hence a new Flink job. But the
>> consumer group does not change, so it will continue from the offset
>> where it left.
>>
>> In addition, operator will also update Task Manager's deployment
>> replicas, and will adjust the pod number.
>>
>> In case of scale up, the existing task manager pods do not get killed,
>> but new task manager pods will be created.
>>
>> And we observed a skew in the partition offset consumed. e.g. some
>> partitions have huge lags and other partitions have small lags. (observed
>> from burrow)
>>
>> This is also validated by the metrics from Flink UI, showing the
>> throughput differs for slotss
>>
>> Any clue why this is the case?
>>
>> Thanks a lot!
>> Eleanore
>>
>


Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
_Hi Xintong,

Thanks for the prompt reply! To answer your question:

   - Which Flink version are you using?

   v1.8.2

   - Is this skew observed only after a scaling-up? What happens if the
   parallelism is initially set to the scaled-up value?

   I also tried this, it seems skew also happens even I do not
change the parallelism, so it may not caused by scale-up/down

   - Keeping the job running a while after the scale-up, does the skew ease?

   So the skew happens in such a way that: some partitions lags
down to 0, but other partitions are still at level of 10_000, and I am
seeing the back pressure is ok.

Thanks a lot!
Eleanore


On Tue, Mar 10, 2020 at 7:03 PM Xintong Song  wrote:

> Hi Eleanore,
>
> I have a few more questions regarding your issue.
>
>- Which Flink version are you using?
>- Is this skew observed only after a scaling-up? What happens if the
>parallelism is initially set to the scaled-up value?
>- Keeping the job running a while after the scale-up, does the skew
>ease?
>
> I suspect the performance difference might be an outcome of some warming
> up issues. E.g., the existing TMs might have some file already localized,
> or some memory buffers already promoted to the JVM tenured area, while the
> new TMs have not.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
> wrote:
>
>> Hi Experts,
>> I have my flink application running on Kubernetes, initially with 1 Job
>> Manager, and 2 Task Managers.
>>
>> Then we have the custom operator that watches for the CRD, when the CRD
>> replicas changed, it will patch the Flink Job Manager deployment
>> parallelism and max parallelism according to the replicas from CRD
>> (parallelism can be configured via env variables for our application).
>> which causes the job manager restart. hence a new Flink job. But the
>> consumer group does not change, so it will continue from the offset
>> where it left.
>>
>> In addition, operator will also update Task Manager's deployment
>> replicas, and will adjust the pod number.
>>
>> In case of scale up, the existing task manager pods do not get killed,
>> but new task manager pods will be created.
>>
>> And we observed a skew in the partition offset consumed. e.g. some
>> partitions have huge lags and other partitions have small lags. (observed
>> from burrow)
>>
>> This is also validated by the metrics from Flink UI, showing the
>> throughput differs for slotss
>>
>> Any clue why this is the case?
>>
>> Thanks a lot!
>> Eleanore
>>
>


flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread wangl...@geekplus.com.cn
有两个表:
tableA: key  valueA
tableB: key  valueB

我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?

谢谢,
王磊


Re: Automatically Clearing Temporary Directories

2020-03-10 Thread Yang Wang
Hi David,

Currently, the TaskManager could cleanup the non-referenced files in blob
cache. It
could configured via `blob.service.cleanup.interval`[1].
Also when the TaskManager is shut down gracefully, the storage directory
will be deleted.
So do you stop your TaskManager forcibly(i.e. kill -9)?

[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#blob-service-cleanup-interval


Best,
Yang

David Maddison  于2020年3月11日周三 上午1:39写道:

> Hi,
>
> When a TaskManager is restarted it can leave behind unreferenced
> BlobServer cache directories in the temporary storage that never get
> cleaned up.  Would it be safe to automatically clear the temporary storage
> every time when a TaskManager is started?
>
> (Note: the temporary volumes in use are dedicated to the TaskManager and
> not shared :-)
>
> Thanks in advance,
>
> David.
>


Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-10 Thread Jiawei Wu
Hi Robert,

Your answer really helps.

About the problem, we have 2 choices. The first one is using Flink as
described in this email thread. The second one is using AWS Lambda
triggered by CDC stream and compute the latest 15 days record, which is a
walk-around solution and looks not as elegant as Flink to me.

Currently we decided to choose AWS Lambda because we are familiar with it,
and the most important, it lead to nearly no operational burden. But we are
actively looking for the comparison between Lambda and Flink and want to
know in which situation we prefer Flink over Lambda. Several teams in our
company are already in a hot debate about the comparison, and the biggest
concern is the non-function requirements about Flink, such as fault
tolerance, recovery, etc.

I also searched the internet but found there are nearly no comparisons
between Lambda and Flink except for their market share :-( I'm wondering
what do you think of this? Or any comments from flink community is
appreciated.

Thanks,
J


On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger  wrote:

> Hey Jiawei,
>
> I'm sorry that you haven't received an answer yet.
>
> So you basically have a stream of dynamodb table updates (let's call id
> CDC stream), and you would like to maintain the inventory of the last 15
> days for each vendor.
> Whenever there's an update in the inventory data (a new event arrives in
> the CDC stream), you want to produce a new event with the inventory count.
>
> If I'm not mistaken, you will need to keep all the inventory in Flink's
> state to have an accurate count and to drop old records when they are
> expired.
> There are two options for maintaining the state:
> - in memory (using the FsStateBackend)
> - on disk (using the embedded RocksDBStatebackend)
>
> I would recommend starting with the RocksDBStateBackend. It will work as
> long as your state fits on all your machines hard disks (we'll probably not
> have an issue there :) )
> If you run into performance issues, you can consider switching to a memory
> based backend (by then, you should have some knowledge about your state
> size)
>
> For tracking the events, I would recommend you to look into Flink's
> windowing API:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>  / https://flink.apache.org/news/2015/12/04/Introducing-windows.html
> Or alternatively doing an implementation with ProcessFunction:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
> I personally would give it a try with ProcessFunction first.
>
> For reading the data from DynamoDB, there's an undocumented feature for it
> in Flink. This is an example for reading from a DynamoDB stream:
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
> Here's also some info: https://issues.apache.org/jira/browse/FLINK-4582
>
> For writing to DynamoDB there is currently no official sink in Flink. It
> should be fairly straightforward to implement a Sink using the SinkFunction
> interface of Flink.
>
> I hope this answers your question.
>
> Best,
> Robert
>
>
>
>
> On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu 
> wrote:
>
>> Hi flink users,
>>
>> We have a problem and think flink may be a good solution for that. But
>> I'm new to flink and hope can get some insights from flink community :)
>>
>> Here is the problem. Suppose we have a DynamoDB table which store the
>> inventory data, the schema is like:
>>
>> * vendorId (primary key)
>> * inventory name
>> * inventory units
>> * inbound time
>> ...
>>
>> This DDB table keeps changing, since we have inventory coming and
>> removal. *Every change will trigger a DynamoDB stream. *
>> We need to calculate *all the inventory units that > 15 days for a
>> specific vendor* like this:
>> > select vendorId, sum(inventory units)
>> > from dynamodb
>> > where today's time - inbound time > 15
>> > group by vendorId
>> We don't want to schedule a daily batch job, so we are trying to work on
>> a micro-batch solution in Flink, and publish this data to another DynamoDB
>> table.
>>
>> A draft idea is to use the total units minus <15 days units, since both
>> of then have event trigger. But no detailed solutions yet.
>>
>> Could anyone help provide some insights here?
>>
>> Thanks,
>> J.
>>
>


Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Xintong Song
Hi Eleanore,

I have a few more questions regarding your issue.

   - Which Flink version are you using?
   - Is this skew observed only after a scaling-up? What happens if the
   parallelism is initially set to the scaled-up value?
   - Keeping the job running a while after the scale-up, does the skew ease?

I suspect the performance difference might be an outcome of some warming up
issues. E.g., the existing TMs might have some file already localized, or
some memory buffers already promoted to the JVM tenured area, while the new
TMs have not.

Thank you~

Xintong Song



On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin  wrote:

> Hi Experts,
> I have my flink application running on Kubernetes, initially with 1 Job
> Manager, and 2 Task Managers.
>
> Then we have the custom operator that watches for the CRD, when the CRD
> replicas changed, it will patch the Flink Job Manager deployment
> parallelism and max parallelism according to the replicas from CRD
> (parallelism can be configured via env variables for our application).
> which causes the job manager restart. hence a new Flink job. But the
> consumer group does not change, so it will continue from the offset
> where it left.
>
> In addition, operator will also update Task Manager's deployment replicas,
> and will adjust the pod number.
>
> In case of scale up, the existing task manager pods do not get killed, but
> new task manager pods will be created.
>
> And we observed a skew in the partition offset consumed. e.g. some
> partitions have huge lags and other partitions have small lags. (observed
> from burrow)
>
> This is also validated by the metrics from Flink UI, showing the
> throughput differs for slotss
>
> Any clue why this is the case?
>
> Thanks a lot!
> Eleanore
>


scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
Hi Experts,
I have my flink application running on Kubernetes, initially with 1 Job
Manager, and 2 Task Managers.

Then we have the custom operator that watches for the CRD, when the CRD
replicas changed, it will patch the Flink Job Manager deployment
parallelism and max parallelism according to the replicas from CRD
(parallelism can be configured via env variables for our application).
which causes the job manager restart. hence a new Flink job. But the
consumer group does not change, so it will continue from the offset
where it left.

In addition, operator will also update Task Manager's deployment replicas,
and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but
new task manager pods will be created.

And we observed a skew in the partition offset consumed. e.g. some
partitions have huge lags and other partitions have small lags. (observed
from burrow)

This is also validated by the metrics from Flink UI, showing the throughput
differs for slotss

Any clue why this is the case?

Thanks a lot!
Eleanore


scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
Hi Experts,
I have my flink application running on Kubernetes, initially with 1 Job
Manager, and 2 Task Managers.

Then we have the custom operator that watches for the CRD, when the CRD
replicas changed, it will patch the Flink Job Manager deployment
parallelism and max parallelism according to the replicas from CRD
(parallelism can be configured via env variables for our application).
which causes the job manager restart. hence a new Flink job. But the
consumer group does not change, so it will continue from the offset
where it left.

In addition, operator will also update Task Manager's deployment replicas,
and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but
new task manager pods will be created.

And we observed a skew in the partition offset consumed. e.g. some
partitions have huge lags and other partitions have small lags. (observed
from burrow)

This is also validated by the metrics from Flink UI, showing the throughput
differs for slotss

Any clue why this is the case?

Thanks a lot!
Eleanore


Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Thomas Weise
Thanks for working on improvements to the Flink Docker container images.
This will be important as more and more users are looking to adopt
Kubernetes and other deployment tooling that relies on Docker images.

A generic, dynamic configuration mechanism based on environment variables
is essential and it is already supported via envsubst and an environment
variable that can supply a configuration fragment:

https://github.com/apache/flink-docker/blob/09adf2dcd99abfb6180e1e2b5b917b288e0c01f6/docker-entrypoint.sh#L88
https://github.com/apache/flink-docker/blob/09adf2dcd99abfb6180e1e2b5b917b288e0c01f6/docker-entrypoint.sh#L85

This gives the necessary control for infrastructure use cases that aim to
supply deployment tooling other users. An example in this category this is
the FlinkK8sOperator:

https://github.com/lyft/flinkk8soperator/tree/master/examples/wordcount

On the flip side, attempting to support a fixed subset of configuration
options is brittle and will probably lead to compatibility issues down the
road:

https://github.com/apache/flink-docker/blob/09adf2dcd99abfb6180e1e2b5b917b288e0c01f6/docker-entrypoint.sh#L97

Besides the configuration, it may be worthwhile to see in which other ways
the base Docker images can provide more flexibility to incentivize wider
adoption.

I would second that it is desirable to support Java 11 and in general use a
base image that allows the (straightforward) use of more recent versions of
other software (Python etc.)

https://github.com/apache/flink-docker/blob/d3416e720377e9b4c07a2d0f4591965264ac74c5/Dockerfile-debian.template#L19

Thanks,
Thomas

On Tue, Mar 10, 2020 at 12:26 PM Andrey Zagrebin 
wrote:

> Hi All,
>
> Thanks a lot for the feedback!
>
> *@Yangze Guo*
>
> - Regarding the flink_docker_utils#install_flink function, I think it
> > should also support build from local dist and build from a
> > user-defined archive.
>
> I suppose you bring this up mostly for development purpose or powerful
> users.
> Most of normal users are usually interested in mainstream released versions
> of Flink.
> Although, you are bring a valid concern, my idea was to keep scope of this
> FLIP mostly for those normal users.
> The powerful users are usually capable to design a completely
> custom Dockerfile themselves.
> At the moment, we already have custom Dockerfiles e.g. for tests in
>
> flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile.
> We can add something similar for development purposes and maybe introduce a
> special maven goal. There is a maven docker plugin, afaik.
> I will add this to FLIP as next step.
>
> - It seems that the install_shaded_hadoop could be an option of
> > install_flink
>
> I woud rather think about this as a separate independent optional step.
>
> - Should we support JAVA 11? Currently, most of the docker file based on
> > JAVA 8.
>
> Indeed, it is a valid concern. Java version is a fundamental property of
> the docker image.
> To customise this in the current mainstream image is difficult, this would
> require to ship it w/o Java at all.
> Or this is a separate discussion whether we want to distribute docker hub
> images with different Java versions or just bump it to Java 11.
> This should be easy in a custom Dockerfile for development purposes though
> as mentioned before.
>
> - I do not understand how to set config options through
>
> "flink_docker_utils configure"? Does this step happen during the image
> > build or the container start? If it happens during the image build,
> > there would be a new image every time we change the config. If it just
> > a part of the container entrypoint, I think there is no need to add a
> > configure command, we could just add all dynamic config options to the
> > args list of "start_jobmaster"/"start_session_jobmanager". Am I
> > understanding this correctly?
>
>  `flink_docker_utils configure ...` can be called everywhere:
> - while building a custom image (`RUN flink_docker_utils configure ..`) by
> extending our base image from docker hub (`from flink`)
> - in a custom entry point as well
> I will check this but if user can also pass a dynamic config option it also
> sounds like a good option
> Our standard entry point script in base image could just properly forward
> the arguments to the Flink process.
>
> @Yang Wang
>
> > About docker utils
> > I really like the idea to provide some utils for the docker file and
> entry
> > point. The
> > `flink_docker_utils` will help to build the image easier. I am not sure
> > about the
> > `flink_docker_utils start_jobmaster`. Do you mean when we build a docker
> > image, we
> > need to add `RUN flink_docker_utils start_jobmaster` in the docker file?
> > Why do we need this?
>
> This is a scripted action to start JM. It can be called everywhere.
> Indeed, it does not make too much sense to run it in Dockerfile.
> Mostly, the idea was to use in a custom entry point. When our base docker
> hub image is started its entry point can be also 

Re: Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi Arvid,

Thank you for the clarification!

Best,
Eleanore


On Tue, Mar 10, 2020 at 12:32 PM Arvid Heise  wrote:

> Hi Eleanore,
>
> incremental checkpointing would be needed if you have a large state
> (GB-TB), but between two checkpoints only little changes happen (KB-MB).
>
> There are two reasons for large state: large user state or large operator
> state coming from joins, windows, or grouping. In the end, you will see the
> total size in the web ui. If it's small and checkpointing duration is low,
> there is absolutely no way to go incremental.
>
> On Tue, Mar 10, 2020 at 5:43 PM Eleanore Jin 
> wrote:
>
>> Hi All,
>>
>> I am using Apache Beam to construct the pipeline, and this pipeline is
>> running with Flink Runner.
>>
>> Both Source and Sink are Kafka topics, I have enabled Beam Exactly once
>> semantics.
>>
>> I believe how it works in beam is:
>> the messages will be cached and not processed by the
>> KafkaExactlyOnceSink, until the checkpoint completes and all the cached
>> messages are checkpointed, then it will start processing those messages.
>>
>> So is there any benefit to enable increment checkpointing when using
>> RocksDB as backend. Because I see the states as consumer offsets, and
>> cached messages in between checkpoints. Delta seems to be the complete new
>> checkpointed states.
>>
>> Thanks a lot!
>> Eleanore
>>
>


Re: Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi Arvid,

Thank you for the clarification!

Best,
Eleanore


On Tue, Mar 10, 2020 at 12:32 PM Arvid Heise  wrote:

> Hi Eleanore,
>
> incremental checkpointing would be needed if you have a large state
> (GB-TB), but between two checkpoints only little changes happen (KB-MB).
>
> There are two reasons for large state: large user state or large operator
> state coming from joins, windows, or grouping. In the end, you will see the
> total size in the web ui. If it's small and checkpointing duration is low,
> there is absolutely no way to go incremental.
>
> On Tue, Mar 10, 2020 at 5:43 PM Eleanore Jin 
> wrote:
>
>> Hi All,
>>
>> I am using Apache Beam to construct the pipeline, and this pipeline is
>> running with Flink Runner.
>>
>> Both Source and Sink are Kafka topics, I have enabled Beam Exactly once
>> semantics.
>>
>> I believe how it works in beam is:
>> the messages will be cached and not processed by the
>> KafkaExactlyOnceSink, until the checkpoint completes and all the cached
>> messages are checkpointed, then it will start processing those messages.
>>
>> So is there any benefit to enable increment checkpointing when using
>> RocksDB as backend. Because I see the states as consumer offsets, and
>> cached messages in between checkpoints. Delta seems to be the complete new
>> checkpointed states.
>>
>> Thanks a lot!
>> Eleanore
>>
>


Re: Is incremental checkpoints needed?

2020-03-10 Thread Arvid Heise
Hi Eleanore,

incremental checkpointing would be needed if you have a large state
(GB-TB), but between two checkpoints only little changes happen (KB-MB).

There are two reasons for large state: large user state or large operator
state coming from joins, windows, or grouping. In the end, you will see the
total size in the web ui. If it's small and checkpointing duration is low,
there is absolutely no way to go incremental.

On Tue, Mar 10, 2020 at 5:43 PM Eleanore Jin  wrote:

> Hi All,
>
> I am using Apache Beam to construct the pipeline, and this pipeline is
> running with Flink Runner.
>
> Both Source and Sink are Kafka topics, I have enabled Beam Exactly once
> semantics.
>
> I believe how it works in beam is:
> the messages will be cached and not processed by the KafkaExactlyOnceSink,
> until the checkpoint completes and all the cached messages are
> checkpointed, then it will start processing those messages.
>
> So is there any benefit to enable increment checkpointing when using
> RocksDB as backend. Because I see the states as consumer offsets, and
> cached messages in between checkpoints. Delta seems to be the complete new
> checkpointed states.
>
> Thanks a lot!
> Eleanore
>


Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Andrey Zagrebin
Hi All,

Thanks a lot for the feedback!

*@Yangze Guo*

- Regarding the flink_docker_utils#install_flink function, I think it
> should also support build from local dist and build from a
> user-defined archive.

I suppose you bring this up mostly for development purpose or powerful
users.
Most of normal users are usually interested in mainstream released versions
of Flink.
Although, you are bring a valid concern, my idea was to keep scope of this
FLIP mostly for those normal users.
The powerful users are usually capable to design a completely
custom Dockerfile themselves.
At the moment, we already have custom Dockerfiles e.g. for tests in
flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile.
We can add something similar for development purposes and maybe introduce a
special maven goal. There is a maven docker plugin, afaik.
I will add this to FLIP as next step.

- It seems that the install_shaded_hadoop could be an option of
> install_flink

I woud rather think about this as a separate independent optional step.

- Should we support JAVA 11? Currently, most of the docker file based on
> JAVA 8.

Indeed, it is a valid concern. Java version is a fundamental property of
the docker image.
To customise this in the current mainstream image is difficult, this would
require to ship it w/o Java at all.
Or this is a separate discussion whether we want to distribute docker hub
images with different Java versions or just bump it to Java 11.
This should be easy in a custom Dockerfile for development purposes though
as mentioned before.

- I do not understand how to set config options through

"flink_docker_utils configure"? Does this step happen during the image
> build or the container start? If it happens during the image build,
> there would be a new image every time we change the config. If it just
> a part of the container entrypoint, I think there is no need to add a
> configure command, we could just add all dynamic config options to the
> args list of "start_jobmaster"/"start_session_jobmanager". Am I
> understanding this correctly?

 `flink_docker_utils configure ...` can be called everywhere:
- while building a custom image (`RUN flink_docker_utils configure ..`) by
extending our base image from docker hub (`from flink`)
- in a custom entry point as well
I will check this but if user can also pass a dynamic config option it also
sounds like a good option
Our standard entry point script in base image could just properly forward
the arguments to the Flink process.

@Yang Wang

> About docker utils
> I really like the idea to provide some utils for the docker file and entry
> point. The
> `flink_docker_utils` will help to build the image easier. I am not sure
> about the
> `flink_docker_utils start_jobmaster`. Do you mean when we build a docker
> image, we
> need to add `RUN flink_docker_utils start_jobmaster` in the docker file?
> Why do we need this?

This is a scripted action to start JM. It can be called everywhere.
Indeed, it does not make too much sense to run it in Dockerfile.
Mostly, the idea was to use in a custom entry point. When our base docker
hub image is started its entry point can be also completely overridden.
The actions are also sorted in the FLIP: for Dockerfile or for entry point.
E.g. our standard entry point script in the base docker hub image can
already use it.
Anyways, it was just an example, the details are to be defined in Jira, imo.

> About docker entry point
> I agree with you that the docker entry point could more powerful with more
> functionality.
> Mostly, it is about to override the config options. If we support dynamic
> properties, i think
> it is more convenient for users without any learning curve.
> `docker run flink session_jobmanager -D rest.bind-port=8081`

Indeed, as mentioned before, it can be a better option.
The standard entry point also decides at least what to run JM or TM. I
think we will see what else makes sense to include there during the
implementation.
Some specifics may be more convenient to set with env vars as Konstantin
mentioned.

> About the logging
> Updating the `log4j-console.properties` to support multiple appender is a
> better option.
> Currently, the native K8s is suggesting users to debug the logs in this
> way[1]. However,
> there is also some problems. The stderr and stdout of JM/TM processes could
> not be
> forwarded to the docker container console.

Strange, we should check maybe there is a docker option to query the
container's stderr output as well.
If we forward Flink process stdout as usual in bash console, it should not
be a problem. Why can it not be forwarded?

@Konstantin Knauf

For the entrypoint, have you considered to also allow setting configuration
> via environment variables as in "docker run -e FLINK_REST_BIN_PORT=8081
> ..."? This is quite common and more flexible, e.g. it makes it very easy to
> pass values of Kubernetes Secrets into the Flink configuration.

This is indeed an interesting option to 

time-windowed joins and tumbling windows

2020-03-10 Thread Vinod Mehra
Hi!

We are testing the following 3 way time windowed join to keep the retained
state size small. Using joins for the first time here. It works in unit
tests but we are not able to get expected results in production. We are
still troubleshooting this issue. Can you please help us review this in
case we missed something or our assumptions are wrong?

SELECT o.region_code,
   concat_ws(
 '/',
 CAST(sum(CASE WHEN r.order_id IS NOT NULL AND c.order_id IS
NULL THEN 1 ELSE 0 END) AS VARCHAR),
 CAST(count(1) AS VARCHAR)
   ) AS offer_conversion_5m
  FROM (
SELECT region_code,
   offer_id,
   rowtime
  FROM event_offer_created
 WHERE ...
) o
   LEFT JOIN (
SELECT offer_id,
   order_id,
   rowtime
  FROM event_order_requested
 WHERE ...
) r
 ON o.offer_id = r.offer_id
 AND r.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' hour
   LEFT JOIN (
SELECT order_id,
   rowtime
  FROM event_order_cancelled
 WHERE ...
) c
 ON r.order_id = c.order_id
 AND c.rowtime BETWEEN r.rowtime AND r.rowtime + INTERVAL '1' hour
 GROUP BY
   o.region_code,
   TUMBLE(o.rowtime, INTERVAL '5' minute)


The sequence of events is:

   1. At time X an offer is created (event stream = "*event_offer_created"*)
   2. At time Y that offer is used to create an order (event stream = "
   *event_order_requested*"). Left join because not all offers get used.
   3. At time Z that order is cancelled (event stream = "
   *event_order_cancelled*"). Left join because not all orders get
   cancelled.

"*offer_conversion_5m*" represents: number of converted orders / total
number of offerings" in a 5 minutes bucket. If an order gets cancelled we
don't want to count that. That's why we have [c.order_id IS NULL THEN 1 ELSE
0 END] in the select.

We picked 1 hour time windows because that's the maximum time we expect the
successive events to take for a given record chain.

The outer GROUP BY is to get 5 minute aggregation for each "region". As
expected the watermark lags 2 hour from the current time because of the two
time-window joins above. The IdleStateRetentionTime is not set, so the
expectation is that the state will be retained as per the time window size
and as the records fall off the window the state will be cleaned up. The
aggregated state is expected to be kept around for 5 minutes (GROUP BY).

However, we are unable to see the conversion (offer_created ->
order_requested (without order_cancelled)). '*offer_conversion_5m*' is
always zero although we know the streams contain records that should have
incremented the count. Any idea what could be wrong? Is the state being
dropped too early (5 mins) because of the outer 5 minute tumbling window?

Thanks,
Vinod


Automatically Clearing Temporary Directories

2020-03-10 Thread David Maddison
Hi,

When a TaskManager is restarted it can leave behind unreferenced BlobServer
cache directories in the temporary storage that never get cleaned up.
Would it be safe to automatically clear the temporary storage every time
when a TaskManager is started?

(Note: the temporary volumes in use are dedicated to the TaskManager and
not shared :-)

Thanks in advance,

David.


Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi All,

I am using Apache Beam to construct the pipeline, and this pipeline is
running with Flink Runner.

Both Source and Sink are Kafka topics, I have enabled Beam Exactly once
semantics.

I believe how it works in beam is:
the messages will be cached and not processed by the KafkaExactlyOnceSink,
until the checkpoint completes and all the cached messages are
checkpointed, then it will start processing those messages.

So is there any benefit to enable increment checkpointing when using
RocksDB as backend. Because I see the states as consumer offsets, and
cached messages in between checkpoints. Delta seems to be the complete new
checkpointed states.

Thanks a lot!
Eleanore


Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi All,

I am using Apache Beam to construct the pipeline, and this pipeline is
running with Flink Runner.

Both Source and Sink are Kafka topics, I have enabled Beam Exactly once
semantics.

I believe how it works in beam is:
the messages will be cached and not processed by the KafkaExactlyOnceSink,
until the checkpoint completes and all the cached messages are
checkpointed, then it will start processing those messages.

So is there any benefit to enable increment checkpointing when using
RocksDB as backend. Because I see the states as consumer offsets, and
cached messages in between checkpoints. Delta seems to be the complete new
checkpointed states.

Thanks a lot!
Eleanore


Re: org.apache.flink.table.planner.PlanningConfigurationBuilder.java

2020-03-10 Thread tison
这个文件是编译时生成的,请在根目录下运行 mvn package

Best,
tison.


jaslou  于2020年3月10日周二 下午11:15写道:

> Hi,
>
>
> 在编译源码的时候发现flink-table-parnner模块的org.apache.flink.table.planner.PlanningConfigurationBuilder.java类报错,
> 找不到
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl文件,发现flink-sql-parser模块下没有impl这个package以及FlinkSqlParserImpl文件
>
> version:release-1.10.0
>
> Best,
> Jaslou
>


Re: Setting app Flink logger

2020-03-10 Thread miki haiat
Which image are you using ?

On Tue, Mar 10, 2020, 16:27 Eyal Pe'er  wrote:

> Hi Rafi,
>
> The file exists (and is the file from the official imageJ, please see
> below).
>
> The user is root and it has permission. I am running in HA mode using
> docker.
>
>
>
> cat /opt/flink/conf/log4j-console.properties
>
>
>
>
> 
>
> #  Licensed to the Apache Software Foundation (ASF) under one
>
> #  or more contributor license agreements.  See the NOTICE file
>
> #  distributed with this work for additional information
>
> #  regarding copyright ownership.  The ASF licenses this file
>
> #  to you under the Apache License, Version 2.0 (the
>
> #  "License"); you may not use this file except in compliance
>
> #  with the License.  You may obtain a copy of the License at
>
> #
>
> #  http://www.apache.org/licenses/LICENSE-2.0
>
> #
>
> #  Unless required by applicable law or agreed to in writing, software
>
> #  distributed under the License is distributed on an "AS IS" BASIS,
>
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>
> #  See the License for the specific language governing permissions and
>
> # limitations under the License.
>
>
> 
>
>
>
> # This affects logging for both user code and Flink
>
> rootLogger.level = INFO
>
> rootLogger.appenderRef.console.ref = ConsoleAppender
>
>
>
> # Uncomment this if you want to _only_ change Flink's logging
>
> #log4j.logger.org.apache.flink=INFO
>
>
>
> # The following lines keep the log level of common libraries/connectors on
>
> # log level INFO. The root logger does not override this. You have to
> manually
>
> # change the log levels here.
>
> logger.akka.name = akka
>
> logger.akka.level = INFO
>
> logger.kafka.name= org.apache.kafka
>
> logger.kafka.level = INFO
>
> logger.hadoop.name = org.apache.hadoop
>
> logger.hadoop.level = INFO
>
> logger.zookeeper.name = org.apache.zookeeper
>
> logger.zookeeper.level = INFO
>
>
>
> # Log all infos to the console
>
> appender.console.name = ConsoleAppender
>
> appender.console.type = CONSOLE
>
> appender.console.layout.type = PatternLayout
>
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c
> %x - %m%n
>
>
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>
> logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
>
> logger.netty.level = OFF
>
>
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
> [image: cid:image003.png@01D32C73.C785C410]
>
>
>
> *From:* Rafi Aroch 
> *Sent:* Tuesday, March 10, 2020 3:55 PM
> *To:* Eyal Pe'er 
> *Cc:* user ; StartApp R Data Platform <
> startapprnd...@startapp.com>
> *Subject:* Re: Setting app Flink logger
>
>
>
> Hi Eyal,
>
>
>
> Sounds trivial, but can you verify that the file actually exists in
> /opt/flink/conf/log4j-console.properties? Also, verify that the user
> running the process has read permissions to that file.
>
> You said you use Flink in YARN mode, but the the example above you run
> inside a docker image so this is a bit confusing. Notice that the official
> docker images run as "flink" user and group ids.
>
>
>
> If you wish to try to use Logback instead, you can place you logback.xml
> file as part of your project resources folder to include it in the
> classpath. That should automatically get detected on startup.
>
>
>
> Hope this helps,
>
> Rafi
>
>
>
>
>
> On Tue, Mar 10, 2020 at 1:42 PM Eyal Pe'er  wrote:
>
> Hi,
>
> I am running Flink in YARN mode using the official image with few
> additional files.
>
> I’ve noticed that my logger failed to initialize:
>
>
>
> root:~# docker logs flink-task-manager
>
> Starting taskexecutor as a console application on host ***.
>
> log4j:WARN No appenders could be found for logger
> (org.apache.flink.runtime.taskexecutor.TaskManagerRunner).
>
> log4j:WARN Please initialize the log4j system properly.
>
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
>
>
>
> I followed the documentation
> 
> and seems like all related configuration files exist.
>
> Currently, I am using the default files from the official image
> https://github.com/apache/flink/tree/master/flink-dist/src/main/flink-bin/conf
>
>
>
> In addition, seems like the process got the right parameters:
>
> root 21892 21866  1 08:29 ?00:02:06
> /usr/local/openjdk-8/bin/java -XX:+UseG1GC
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -classpath
> 

org.apache.flink.table.planner.PlanningConfigurationBuilder.java

2020-03-10 Thread jaslou
Hi,


在编译源码的时候发现flink-table-parnner模块的org.apache.flink.table.planner.PlanningConfigurationBuilder.java类报错,
找不到org.apache.flink.sql.parser.impl.FlinkSqlParserImpl文件,发现flink-sql-parser模块下没有impl这个package以及FlinkSqlParserImpl文件


version:release-1.10.0


Best,
Jaslou

RE: Setting app Flink logger

2020-03-10 Thread Eyal Pe'er
Hi Rafi,
The file exists (and is the file from the official image☺, please see below).
The user is root and it has permission. I am running in HA mode using docker.

cat /opt/flink/conf/log4j-console.properties


#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.


# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

Best regards
Eyal Peer / Data Platform Developer
[cid:image003.png@01D32C73.C785C410]

From: Rafi Aroch 
Sent: Tuesday, March 10, 2020 3:55 PM
To: Eyal Pe'er 
Cc: user ; StartApp R Data Platform 

Subject: Re: Setting app Flink logger

Hi Eyal,

Sounds trivial, but can you verify that the file actually exists in 
/opt/flink/conf/log4j-console.properties? Also, verify that the user running 
the process has read permissions to that file.
You said you use Flink in YARN mode, but the the example above you run inside a 
docker image so this is a bit confusing. Notice that the official docker images 
run as "flink" user and group ids.

If you wish to try to use Logback instead, you can place you logback.xml file 
as part of your project resources folder to include it in the classpath. That 
should automatically get detected on startup.

Hope this helps,
Rafi


On Tue, Mar 10, 2020 at 1:42 PM Eyal Pe'er 
mailto:eyal.p...@startapp.com>> wrote:
Hi,
I am running Flink in YARN mode using the official image with few additional 
files.
I’ve noticed that my logger failed to initialize:

root:~# docker logs flink-task-manager
Starting taskexecutor as a console application on host ***.
log4j:WARN No appenders could be found for logger 
(org.apache.flink.runtime.taskexecutor.TaskManagerRunner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.

I followed the 
documentation
 and seems like all related configuration files exist.
Currently, I am using the default files from the official image 
https://github.com/apache/flink/tree/master/flink-dist/src/main/flink-bin/conf

In addition, seems like the process got the right parameters:
root 21892 21866  1 08:29 ?00:02:06 /usr/local/openjdk-8/bin/java 
-XX:+UseG1GC 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath 
/opt/flink/lib/flink-metrics-prometheus-1.9.2.jar:/opt/flink/lib/flink-table-blink_2.11-1.9.2.jar:/opt/flink/lib/flink-table_2.11-1.9.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.9.2.jar:::
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir 
/opt/flink/conf

What am I doing wrong ? How can I turn logger on?
Best regards
Eyal Peer


Re: History server UI not working

2020-03-10 Thread Yadong Xie
Hi pwestermann

I believe this is related to
https://issues.apache.org/jira/browse/FLINK-13799

It seems that the configuration.features['web-submit'] is missed from the
api when you upgrading from 1.7 to 1.9.2

Do you have the same problem when upgrading to 1.10? feel free to ping me if
you still have related problems.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Setting app Flink logger

2020-03-10 Thread Rafi Aroch
Hi Eyal,

Sounds trivial, but can you verify that the file actually exists in
/opt/flink/conf/log4j-console.properties? Also, verify that the user
running the process has read permissions to that file.
You said you use Flink in YARN mode, but the the example above you run
inside a docker image so this is a bit confusing. Notice that the official
docker images run as "flink" user and group ids.

If you wish to try to use Logback instead, you can place you logback.xml
file as part of your project resources folder to include it in the
classpath. That should automatically get detected on startup.

Hope this helps,
Rafi


On Tue, Mar 10, 2020 at 1:42 PM Eyal Pe'er  wrote:

> Hi,
>
> I am running Flink in YARN mode using the official image with few
> additional files.
>
> I’ve noticed that my logger failed to initialize:
>
>
>
> root:~# docker logs flink-task-manager
>
> Starting taskexecutor as a console application on host ***.
>
> log4j:WARN No appenders could be found for logger
> (org.apache.flink.runtime.taskexecutor.TaskManagerRunner).
>
> log4j:WARN Please initialize the log4j system properly.
>
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
>
>
>
> I followed the documentation
> 
> and seems like all related configuration files exist.
>
> Currently, I am using the default files from the official image
> https://github.com/apache/flink/tree/master/flink-dist/src/main/flink-bin/conf
>
>
>
> In addition, seems like the process got the right parameters:
>
> root 21892 21866  1 08:29 ?00:02:06
> /usr/local/openjdk-8/bin/java -XX:+UseG1GC
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -classpath
> /opt/flink/lib/flink-metrics-prometheus-1.9.2.jar:/opt/flink/lib/flink-table-blink_2.11-1.9.2.jar:/opt/flink/lib/flink-table_2.11-1.9.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.9.2.jar:::
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir
> /opt/flink/conf
>
>
>
> What am I doing wrong ? How can I turn logger on?
>
> Best regards
>
> Eyal Peer
>


Failure detection and Heartbeats

2020-03-10 Thread Morgan Geldenhuys

Hi community,

I am interested in knowing more about the failure detection mechanism 
used by Flink, unfortunately information is a little thin on the ground 
and I was hoping someone could shed a little light on the topic.


Looking at the documentation 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html), 
there are these two configuration options:



 heartbeat.interval

1   LongTime interval for requesting heartbeat from sender side.


 heartbeat.timeout

	5 	Long 	Timeout for requesting and receiving heartbeat for both 
sender and receiver sides.


This would indicate Flink uses a heartbeat mechanism to ascertain the 
liveness of TaskManagers. From this the following assumptions are made:


The JobManager is responsible for broadcasting a heartbeat requests to 
all TaskManagers and awaits responses.
If a response is not forthcoming from any particular node within the 
heartbeat timeout period, e.g. 50 seconds by default, then that node is 
timed out and assumed to have failed.
The heartbeat interval indicated how often the heartbeat request 
broadcast is scheduled.
Having the heartbeat interval shorter than the heartbeat timeout would 
mean that multiple requests can be underway at the same time.
Therefore, the TaskManager would need to fail to respond to 4 requests 
(assuming normal response times are lower than 10 seconds) before being 
timed out after 50 seconds.


So therefore if a failure were to occur (considering the default settings):
- In the best case the JobManager would detect the failure in the 
shortest time, i.e. 50 seconds +- (node fails just before receiving the 
next heartbeat request)
- In the worst case the JobManager would detect the failure in the 
longest time, i.e. 60 seconds +- (node fails just after sending the last 
heartbeat response)


Is this correct?

For JobManagers in HA mode, this is left to ZooKeeper timeouts which 
then initiates a round of elections and the new leader picks up from the 
previous checkpoint.


Thank you in advance.

Regards,
M.









Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, 
status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates 
yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn 
 wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread David Anderson
Watermarks are a tool for handling out-of-orderness when working with event
time timestamps. They provide a mechanism for managing the tradeoff between
latency and completeness, allowing you to manage how long to wait for any
out-of-orderness to resolve itself. Note the way that Flink uses these
terms, out-of-orderness is not the same as lateness: your watermarking will
accommodate a certain amount of out-of-orderness, and out-of-order events
that arrive within this timeframe are not considered late. Only events that
are excessively out-of-order -- i.e., with timestamps behind the current
watermark -- are late.

I would say that the documentation you quoted is a bit misleading, since
with ingestion time processing there can be no late events.

Most of the Flink runtime only makes a distinction between processing time
and event time. For example, there are processing time timers (triggered by
the system clock) and event time timers (triggered by watermarks), but
there's no such thing as an ingestion time timer. Ingestion time is a
hybrid between the two that assigns timestamps and watermarks based on
processing time, and then the rest of the pipeline behaves as though you
were doing event time processing.

This means that when working with ingestion time you lose most of the
benefits of event time processing, such as deterministic, reproducible
behavior. But using ingestion time does make it possible to use certain
parts of the APIs that are described as "event time only", such as interval
joins.

I don't know enough about streaming-gelly to speculate about what's going
on there.

David



On Tue, Mar 10, 2020 at 10:14 AM kant kodali  wrote:

> Hi Arvid,
>
> If ingestion time programs cannot handle late data then why would it
> generate watermarks? Isn't the whole point of watermarks is to handle the
> late data?
>
> My last question was more about this library
>  I run several algorithms using
> SimpleEdgeStream.aggregrate().print() and I am running into
> the following error whenever I invoke the following constructor
> 
>  .
> But it works if I change it to this
> 
>  so
> I am not exactly sure what is happening there.
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
> at Test.main(Test.java:86)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
> ... 8 more
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 0c61d6ef0483c3068076a988bc252a74)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>
> at
> 

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Aljoscha Krettek

On 10.03.20 10:13, kant kodali wrote:


If ingestion time programs cannot handle late data then why would it
generate watermarks? Isn't the whole point of watermarks is to handle the
late data?


Watermarks are not only used for handling late data. Watermarks are the 
mechanism that is used to update time throughout the streaming topology, 
starting from the sources. Among other things is is used to detect late 
data.


When setting the characteristic to "ingestion time" you are essentially 
instating a watermark extractor that extracts the current processing 
time at the sources as event time.



My last question was more about this library
 I run several algorithms using
SimpleEdgeStream.aggregrate().print() and I am running into the
following error whenever I invoke the following constructor

.
But it works if I change it to this

so
I am not exactly sure what is happening there.


I don't know what is going on here, could it be that the library 
internally sets the characteristic to event-time, thereby overriding 
your ingestion-time setting? In that case you would indeed be missing a 
watermark extractor. I'm cc'ing Vasia, as the author of that library.


-Aljoscha


Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread Arvid Heise
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no
deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:

tables:
  - name: MyTableSink
type: sink-table
update-mode: append
connector:
  property-version: 1
  type: kafka
  version: "0.11"
  topic: OutputTopic
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
  property-version: 1
  type: json
  derive-schema: true
schema:
  - name: rideId
data-type: BIGINT
  - name: lon
data-type: FLOAT
  - name: lat
data-type: FLOAT
  - name: rideTime
data-type: TIMESTAMP(3)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> I have configured  source table successfully using the following
> configuration:
>
> - name: out_order
> type: source
> update-mode: append
> schema:
> - name: out_order_code
>   type: STRING
> - name: input_date
>   type: BIGINT
> - name: owner_code
>   type: STRING
> connector:
>   property-version: 1
>   type: kafka
>   version: universal
>   topic: out_order
>   startup-mode: latest-offset
>   properties:
>   - key: zookeeper.connect
> value: 172.19.78.32:2181
>   - key: bootstrap.servers
> value: 172.19.78.32:9092
>   - key: group.id
>   property-version: 1
>   type: json
>   schema: "ROW(out_order_code STRING,owner_code STRING,input_date
> BIGINT)"
>
> How can i configure a sink table? I haven't found any useful docs for this.
>
> Thanks,
> Lei
>


Re: RocksDB

2020-03-10 Thread David Anderson
The State Processor API goes a bit in the direction you asking about, by
making it possible to query savepoints.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html


Regards,
David


On Tue, Mar 10, 2020 at 1:05 PM Aljoscha Krettek 
wrote:

> On 10.03.20 11:36, Timothy Victor wrote:
> > Can the RocksDB state backend used by Flink be queries from outside, e.g.
> > via SQL?
>
> That's not possible, but you might be interested in queryable state:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
>
> > Or maybe a better question, is there a RocksDB SinkFunction that exists?n
>
> I'm afraid that doesn't exist, and it probably never will because
> usually RocksDB is not a distributed system so writing to it from a
> parallel stream processing application would most likely not work well.
>
> Best,
> Aljoscha
>


Re: RocksDB

2020-03-10 Thread Aljoscha Krettek

On 10.03.20 11:36, Timothy Victor wrote:

Can the RocksDB state backend used by Flink be queries from outside, e.g.
via SQL?


That's not possible, but you might be interested in queryable state: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html



Or maybe a better question, is there a RocksDB SinkFunction that exists?n


I'm afraid that doesn't exist, and it probably never will because 
usually RocksDB is not a distributed system so writing to it from a 
parallel stream processing application would most likely not work well.


Best,
Aljoscha


Setting app Flink logger

2020-03-10 Thread Eyal Pe'er
Hi,
I am running Flink in YARN mode using the official image with few additional 
files.
I've noticed that my logger failed to initialize:

root:~# docker logs flink-task-manager
Starting taskexecutor as a console application on host ***.
log4j:WARN No appenders could be found for logger 
(org.apache.flink.runtime.taskexecutor.TaskManagerRunner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.

I followed the 
documentation
 and seems like all related configuration files exist.
Currently, I am using the default files from the official image 
https://github.com/apache/flink/tree/master/flink-dist/src/main/flink-bin/conf

In addition, seems like the process got the right parameters:
root 21892 21866  1 08:29 ?00:02:06 /usr/local/openjdk-8/bin/java 
-XX:+UseG1GC 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath 
/opt/flink/lib/flink-metrics-prometheus-1.9.2.jar:/opt/flink/lib/flink-table-blink_2.11-1.9.2.jar:/opt/flink/lib/flink-table_2.11-1.9.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.9.2.jar:::
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir 
/opt/flink/conf

What am I doing wrong ? How can I turn logger on?
Best regards
Eyal Peer


回复: Hive Source With Kerberos认证问题

2020-03-10 Thread 叶贤勋
在doAs方法中是可以的。我现在hive connector中操作hive涉及认证的代码都在doAs中执行,可以解决认证问题。
前面提到的stacktrace是用我们公司自己封装的hive-exec 
jar打印出来的,所以跟源码对应不上,我用官网的hive-exec-2.1.1.jar也是有这个问题。




| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2020年03月5日 13:52,Rui Li 写道:
能不能先用doAs的方式来试一下,比如注册HiveCatalog的部分在UserGroupInformation.getLoginUser().doAs()里做,排查下是不是HiveMetaStoreClient没有用上你登录用户的信息。
另外你的hive版本是2.1.1么?从stacktrace上来看跟2.1.1的代码对不上,比如
HiveMetaStoreClient.java的第562行:
https://github.com/apache/hive/blob/rel/release-2.1.1/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java#L562

On Wed, Mar 4, 2020 at 9:17 PM 叶贤勋  wrote:

你好,
datanucleus jar的包的问题已经解决,之前应该是没有通过hive.metastore.uris进行连接访问HMS。
我在HiveCatalog的open方法里面做了Kerberos登录,
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
并且已经登录成功。按理说Kerberos登录成功后在这个进程就应该有权限访问metastore了吧。但是在创建megastore
client时报了以下错误。

2020-03-04 20:23:17,191 DEBUG
org.apache.flink.table.catalog.hive.HiveCatalog   - Hive
MetaStore Uris is thrift://***1:9083,thrift://***2:9083.
2020-03-04 20:23:17,192 INFO
org.apache.flink.table.catalog.hive.HiveCatalog   - Created
HiveCatalog 'myhive'
2020-03-04 20:23:17,360 INFO
org.apache.hadoop.security.UserGroupInformation   - Login
successful for user ***/dev@***.COM using keytab file
/Users/yexianxun/IdeaProjects/flink-1.9.0/build-target/examples/hive/kerberos/key.keytab
2020-03-04 20:23:17,360 DEBUG
org.apache.flink.table.catalog.hive.HiveCatalog   - login user
by kerberos, principal is ***/dev@***.CO, login is true
2020-03-04 20:23:17,374 INFO
org.apache.curator.framework.imps.CuratorFrameworkImpl- Starting
2020-03-04 20:23:17,374 DEBUG org.apache.curator.CuratorZookeeperClient
- Starting
2020-03-04 20:23:17,374 DEBUG org.apache.curator.ConnectionState
- Starting
2020-03-04 20:23:17,374 DEBUG org.apache.curator.ConnectionState
- reset
2020-03-04 20:23:17,374 INFO  org.apache.zookeeper.ZooKeeper
- Initiating client connection,
connectString=***1:2181,***2:2181,***3:2181 sessionTimeout=6
watcher=org.apache.curator.ConnectionState@6b52dd31
2020-03-04 20:23:17,379 DEBUG
org.apache.zookeeper.client.ZooKeeperSaslClient   - JAAS
loginContext is: HiveZooKeeperClient
2020-03-04 20:23:17,381 WARN  org.apache.zookeeper.ClientCnxn
- SASL configuration failed:
javax.security.auth.login.LoginException: Unable to obtain password from
user
Will continue connection to Zookeeper server without SASL authentication,
if Zookeeper server allows it.
2020-03-04 20:23:17,381 INFO  org.apache.zookeeper.ClientCnxn
- Opening socket connection to server ***1:2181
2020-03-04 20:23:17,381 ERROR org.apache.curator.ConnectionState
- Authentication failed
2020-03-04 20:23:17,384 INFO  org.apache.zookeeper.ClientCnxn
- Socket connection established to ***1:2181, initiating
session
2020-03-04 20:23:17,384 DEBUG org.apache.zookeeper.ClientCnxn
- Session establishment request sent on ***1:2181
2020-03-04 20:23:17,393 INFO  org.apache.zookeeper.ClientCnxn
- Session establishment complete on server ***1:2181,
sessionid = 0x16f7af0645c25a8, negotiated timeout = 4
2020-03-04 20:23:17,393 INFO
org.apache.curator.framework.state.ConnectionStateManager - State
change: CONNECTED
2020-03-04 20:23:17,397 DEBUG org.apache.zookeeper.ClientCnxn
- Reading reply sessionid:0x16f7af0645c25a8, packet::
clientPath:null serverPath:null finished:false header:: 1,3  replyHeader::
1,292064345364,0  request:: '/hive_base,F  response::
s{17179869635,17179869635,1527576303010,1527576303010,0,3,0,0,0,1,249117832596}
2020-03-04 20:23:17,400 DEBUG org.apache.zookeeper.ClientCnxn
- Reading reply sessionid:0x16f7af0645c25a8, packet::
clientPath:null serverPath:null finished:false header:: 2,12  replyHeader::
2,292064345364,0  request:: '/hive_base/namespaces/hive/uris,F  response::
v{'dGhyaWZ0Oi8vaHphZGctYmRtcy03LnNlcnZlci4xNjMub3JnOjkwODM=,'dGhyaWZ0Oi8vaHphZGctYmRtcy04LnNlcnZlci4xNjMub3JnOjkwODM=},s{17179869664,17179869664,1527576306106,1527576306106,0,1106,0,0,0,2,292063632993}
2020-03-04 20:23:17,401 INFO  hive.metastore
- atlasProxy is set to
2020-03-04 20:23:17,401 INFO  hive.metastore
- Trying to connect to metastore with URI thrift://
hzadg-bdms-7.server.163.org:9083
2020-03-04 20:23:17,408 INFO  hive.metastore
- tokenStrForm should not be null for querynull
2020-03-04 20:23:17,432 DEBUG org.apache.thrift.transport.TSaslTransport
- opening transport
org.apache.thrift.transport.TSaslClientTransport@3c69362a
2020-03-04 20:23:17,441 ERROR org.apache.thrift.transport.TSaslTransport
- SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]
at
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
at
org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
at
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
at

Re: Re: Kafka sink only support append mode?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Jark, 
Thanks  for the  explanation.
The group by statement will result a not append stream. 
I have just tried a join statement and want to send the result to kafka, it 
also has the error:
 AppendStreamTableSink requires that Table has only insert changes
Why the join result is not appendable. It confused me.

Thanks,
Lei
 
From: Jark Wu
Date: 2020-03-09 19:25
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Kafka sink only support append mode?
Hi Lei,

Yes. Currently, Kafka sink only supports append mode. Other update mode (e.g. 
upsert mode / retract mode) is on the agenda. 
For now, you can customize a KafkaTableSink with implementing 
UpsertStreamTableSink interface, where you will get a Tuple2 
records, 
and the Boolean represents insert or delete operation. Then you can encode the 
insert/delete operation into Kafka storage or just ignore the operations. 

Best,
Jark

On Mon, 9 Mar 2020 at 19:14, wangl...@geekplus.com.cn 
 wrote:
I  wrote a simple program reading from kafka using  sql  and sink to kafka.
But only  'update-mode' = 'append' is supported for sink table and the query 
sql must have no group statement.
Only append mode is supported for kafka sink?

Thanks,
Lei




RocksDB

2020-03-10 Thread Timothy Victor
Can the RocksDB state backend used by Flink be queries from outside, e.g.
via SQL?

Or maybe a better question, is there a RocksDB SinkFunction that exists?

Thanks

Tim


Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn

I have configured  source table successfully using the following configuration:

- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
  type: STRING
- name: input_date
  type: BIGINT
- name: owner_code
  type: STRING
connector:
  property-version: 1
  type: kafka
  version: universal
  topic: out_order
  startup-mode: latest-offset
  properties:
  - key: zookeeper.connect
value: 172.19.78.32:2181
  - key: bootstrap.servers
value: 172.19.78.32:9092
  - key: group.id
  property-version: 1
  type: json
  schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei


Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread kant kodali
Hi Arvid,

If ingestion time programs cannot handle late data then why would it
generate watermarks? Isn't the whole point of watermarks is to handle the
late data?

My last question was more about this library
 I run several algorithms using
SimpleEdgeStream.aggregrate().print() and I am running into the
following error whenever I invoke the following constructor

.
But it works if I change it to this

so
I am not exactly sure what is happening there.

 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 0c61d6ef0483c3068076a988bc252a74)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 0c61d6ef0483c3068076a988bc252a74)

at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:86)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 0c61d6ef0483c3068076a988bc252a74)

at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.

at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: 

join key 有重复的双流 join 怎样去重后发送到 kafka

2020-03-10 Thread wangl...@geekplus.com.cn
有两个 kafka 作为数据源的表 
order_info:
  order_no   info
order_status: 
  order_no  status

两个表的 order_no 都会有重复,来一条其中一个表的记录,会在另外一个表中找到多条记录。
我怎样实现在另外一个表中只取出与该 join key 相关的最新的一条记录并发送到 kafka 中呢?

kafka 只支持 append 模式的 sink,先把 表 group 再join 行不通。

谢谢,
王磊




Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Arvid Heise
Hi Kant,

I just saw that asked the same question on SO [1]. Could you, in the
future, please cross-reference these posts, so that we don't waste
resources on answering?

[1]
https://stackoverflow.com/questions/60610985/do-i-need-to-set-assigntimestampsandwatermarks-if-i-set-my-time-characteristic-t

On Tue, Mar 10, 2020 at 9:33 AM Arvid Heise  wrote:

> Hi Kant,
>
> according to the documentation [1], you don't need to set a watermark
> assigner:
>
>> Compared to *event time*, *ingestion time* programs cannot handle any
>> out-of-order events or late data, but the programs don’t have to specify
>> how to generate *watermarks*.
>>
>> Internally, *ingestion time* is treated much like *event time*, but with
>> automatic timestamp assignment and automatic watermark generation.
>>
>
> So it's neither possible to assign timestamps nor watermark, but it seems
> as if the default behavior is exactly as you want it to be. If that doesn't
> work for you, could you please rephrase your last question or describe your
> use case? I didn't get it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html
>
> On Tue, Mar 10, 2020 at 5:01 AM kant kodali  wrote:
>
>> Hi All,
>>
>> Do I need to set assignTimestampsAndWatermarks if I set my time
>> characteristic to IngestionTime?
>>
>> say I set my time characteristic of stream execution environment to
>> Ingestion time as follows
>>
>>
>> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>
>> do I need to call
>> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>>
>> I thought datastream.assignTimestampsAndWatermarks is mandatory only if
>> time characteristic is event time. No? Did this behavior change in Flink
>> 1.10? because I see libraries not setting
>> datastream.assignTimestampsAndWatermarks when time characteristic is
>> Ingestion time but they do for event time. If not, I am wondering how can I
>> set AscendingTimestampExtractor in a distributed environment? is there
>> anyway to add monotonically increasing long(AscendingTimestampExtractor)
>> without any distributed locks?
>>
>> Thanks!
>>
>


Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Arvid Heise
Hi Kant,

according to the documentation [1], you don't need to set a watermark
assigner:

> Compared to *event time*, *ingestion time* programs cannot handle any
> out-of-order events or late data, but the programs don’t have to specify
> how to generate *watermarks*.
>
> Internally, *ingestion time* is treated much like *event time*, but with
> automatic timestamp assignment and automatic watermark generation.
>

So it's neither possible to assign timestamps nor watermark, but it seems
as if the default behavior is exactly as you want it to be. If that doesn't
work for you, could you please rephrase your last question or describe your
use case? I didn't get it.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html

On Tue, Mar 10, 2020 at 5:01 AM kant kodali  wrote:

> Hi All,
>
> Do I need to set assignTimestampsAndWatermarks if I set my time
> characteristic to IngestionTime?
>
> say I set my time characteristic of stream execution environment to
> Ingestion time as follows
>
>
> streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
> do I need to call
> datastream.assignTimestampsAndWatermarks(AscendingTimestampExtractor) ?
>
> I thought datastream.assignTimestampsAndWatermarks is mandatory only if
> time characteristic is event time. No? Did this behavior change in Flink
> 1.10? because I see libraries not setting
> datastream.assignTimestampsAndWatermarks when time characteristic is
> Ingestion time but they do for event time. If not, I am wondering how can I
> set AscendingTimestampExtractor in a distributed environment? is there
> anyway to add monotonically increasing long(AscendingTimestampExtractor)
> without any distributed locks?
>
> Thanks!
>


Re: Flink Serialization as stable (kafka) output format?

2020-03-10 Thread Arvid Heise
Hi Theo,

I strongly discourage the use of flink serialization for persistent storage
of data. It was never intended to work in this way and does not offer the
benefits of Avro of lazy schema evolution and maturity.

Unless you can explicitly measure that Avro is a bottleneck in your setup,
stick with it. It's the preferred way to store data in Kafka for a reason.
It's mature, supports plenty of languages, and the schema evolution feature
will save you so many headaches in the future.

If it turns out to be a bottleneck, the most logical alternative is
protobuf. Kryo is even worse than Flink serializer for Kafka. In general,
realistically speaking, it's so much more cost-effective to just add
another node to your Flink cluster and use Avro than coming up with any
clever solution (just assume that you need at least one man month to
implement and do the math).

And btw, you should always use generated Java/scala classes if possible for
Avro. It's faster and offers a much nicer development experience.

On Mon, Mar 9, 2020 at 3:57 PM Robert Metzger  wrote:

> Hi Theo,
>
> However, in most benchmarks, avro turns out to be rather slow in terms of
>> CPU cycles ( e.g. [1]  )
>
>
> Avro is slower compared to what?
> You should not only benchmark the CPU cycles for serializing the data. If
> you are sending JSON strings across the network, you'll probably have a lot
> more bytes to send across the network, making everything slower (usually
> network is slower than CPU)
>
> One of the reasons why people use Avro it supports schema evolution.
>
> Regarding your questions:
> 1. For this use case, you can use the Flink data format as an internal
> message format (between the star architecture jobs)
> 2. Generally speaking no
> 3. You will at leave have a dependency to flink-core. And this is a
> somewhat custom setup, so you might be facing breaking API changes.
> 4. I'm not aware of any benchmarks. The Flink serializers are mostly for
> internal use (between our operators), Kryo is our fallback (to not suffer
> to much from the not invented here syndrome), while Avro is meant for
> cross-system serialization.
>
> I have the feeling that you can move ahead with using Flink's Pojo
> serializer everywhere :)
>
> Best,
> Robert
>
>
>
>
> On Wed, Mar 4, 2020 at 1:04 PM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi,
>>
>> Without knowing too much about flink serialization, I know that Flinks
>> states that it serializes POJOtypes much faster than even the fast Kryo for
>> Java. I further know that it supports schema evolution in the same way as
>> avro.
>>
>> In our project, we have a star architecture, where one flink job produces
>> results into a kafka topic and where we have multiple downstream consumers
>> from that kafka topic (Mostly other flink jobs).
>> For fast development cycles, we currently use JSON as output format for
>> the kafka topic due to easy debugging capabilities and best migration
>> possibilities. However, when scaling up, we need to switch to a more
>> efficient format. Most often, Avro is mentioned in combination with a
>> schema registry, as its much more efficient then JSON where essentially,
>> each message contains the schema as well. However, in most benchmarks, avro
>> turns out to be rather slow in terms of CPU cycles ( e.g. [1]
>>  )
>>
>> My question(s) now:
>> 1. Is it reasonable to use flink serializers as message format in Kafka?
>> 2. Are there any downsides in using flinks serialization result as output
>> format to kafka?
>> 3. Can downstream consumers, written in Java, but not flink components,
>> also easily deserialize flink serialized POJOs? Or do they have a
>> dependency to at least full flink-core?
>> 4. Do you have benchmarks comparing flink (de-)serialization performance
>> to e.g. kryo and avro?
>>
>> The only thing I come up with why I wouldn't use flink serialization is
>> that we wouldn't have a schema registry, but in our case, we share all our
>> POJOs in a jar which is used by all components, so that is kind of a schema
>> registry already and if we only make avro compatible changes, which are
>> also well treated by flink, that shouldn't be any limitation compared to
>> like avro+registry?
>>
>> Best regards
>> Theo
>>
>> [1] https://github.com/eishay/jvm-serializers/wiki
>>
>


Re: How to change the flink web-ui jobServer?

2020-03-10 Thread Arvid Heise
Hi LakeShen,

you can change the port with

conf.setInteger(RestOptions.PORT, 8082);

or if want to be on the safe side specify a range

conf.setString(RestOptions.BIND_PORT, "8081-8099");


On Mon, Mar 9, 2020 at 10:47 AM LakeShen  wrote:

> Hi community,
>now I am moving the flink job to k8s,and I plan to use the ingress
> to show the flink web ui  , the problem is that fink job server aren't
> correct, so I want to change the flink web-ui jobserver ,I don't find the
> any method  to change it ,are there some method to do that?
>Thanks to your reply.
>
> Best wishes,
> LakeShen
>


Re: How to change the flink web-ui jobServer?

2020-03-10 Thread Arvid Heise
Hi LakeShen,

you can change the port with

conf.setInteger(RestOptions.PORT, 8082);

or if want to be on the safe side specify a range

conf.setString(RestOptions.BIND_PORT, "8081-8099");


On Mon, Mar 9, 2020 at 10:47 AM LakeShen  wrote:

> Hi community,
>now I am moving the flink job to k8s,and I plan to use the ingress
> to show the flink web ui  , the problem is that fink job server aren't
> correct, so I want to change the flink web-ui jobserver ,I don't find the
> any method  to change it ,are there some method to do that?
>Thanks to your reply.
>
> Best wishes,
> LakeShen
>


Re: Question on the SQL "GROUPING SETS" and "CUBE" syntax usability

2020-03-10 Thread Arvid Heise
Hi Weike,

according to the linked documentation, the operations are ready but as you
have mentioned only for SQL batch mode, which is not surprising as they
don't have a well-behaved semantics on streams. See also Calcites
explanations [1].

Could you maybe outline your use case and what you'd expect these
operations to be? Would you like to combine them with windows?

I'm CCing Jark, as he knows SQL much better than me.

[1] https://calcite.apache.org/docs/stream.html#grouping-sets

On Mon, Mar 9, 2020 at 8:27 AM DONG, Weike  wrote:

> Hi,
>
> From the Flink 1.10  official document (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html),
> we could see that GROUPING SETS is only supported in Batch mode.
>
> [image: image.png]
>
> However, we also found that in
> https://issues.apache.org/jira/browse/FLINK-12192, stream query using
> GROUPING SETS and CUBE is already a fixed issues in 1.9.
>
> Here I would like to know if SQL support for GROUPING SETS, ROLLUP, CUBE
> is ready for use or not, and whether the document needs to be updated or
> not.
>
> Thank you
>
> Best regards,
> Weike
>


Re: How to print the aggregated state everytime it is updated?

2020-03-10 Thread Arvid Heise
Hi Kant,

if you only want to output every second, you probably want to use a
ProcessFunction with timers [1].

Basically, this function holds the states and manages the updates to it.
The updates should also be stored in a local/non-state variable *changes*.
Whenever the timer triggers, you would output *changes *(possibly to a side
output) and reset it.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timers

On Fri, Mar 6, 2020 at 4:39 PM Robert Metzger  wrote:

> Hey,
>
> I don't think you need to use a window operator for this use case. A
> reduce (or fold) operation should be enough:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
>
>
> On Fri, Mar 6, 2020 at 11:50 AM kant kodali  wrote:
>
>> Hi,
>>
>> Thanks for this. so how can I emulate an infinite window while outputting
>> every second? simply put, I want to store the state forever (say years) and
>> since rocksdb is my state backend I am assuming I can state the state until
>> I run out of disk. However I want to see all the updates to the states
>> every second. sounds to me I need to have a window of one second, compute
>> for that window and pass it on to next window or is there some other way?
>>
>> Thanks
>>
>> On Fri, Mar 6, 2020 at 1:33 AM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>>
>>> From the description, you use window operator, and set to event time.
>>> then you should call `DataStream.assignTimestampsAndWatermarks` to set
>>> the timestamp and watermark.
>>> Window is triggered when the watermark exceed the window end time
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> kant kodali  于2020年3月4日周三 上午5:11写道:
>>>
 Hi All,

 I have a custom aggregated state that is represent by Set and I
 have a stream of values coming in from Kafka where I inspect, compute the
 custom aggregation and store it in Set. Now, I am trying to figureout
 how do I print the updated value everytime this state is updated?

 Imagine I have a Datastream>

 I tried few things already but keep running into the following
 exception. Not sure why? Do I need to call assignTimestampsAndWatermark? I
 thought watermarks are not mandatory in Flink especially when I want to
 keep this aggregated state forever. any simple code sample on how to print
 the streaming aggregated state represented by Datastream> will be
 great! You can imagine my Set has a toString() method that takes
 cares of printing..and I just want to see those values in stdout.

 Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
 timestamp (= no timestamp marker). Is the time characteristic set to
 'ProcessingTime', or did you forget to call
 'DataStream.assignTimestampsAndWatermarks(...)'?

>>>


Re: How do I get the value of 99th latency inside an operator?

2020-03-10 Thread Arvid Heise
Hi Felipe,

could you use the JMX metrics reporter and tap into the reported values?

The proposed hacks are obviously unstable over time.

On Fri, Mar 6, 2020 at 1:06 PM Aljoscha Krettek  wrote:

> Hi,
>
> I'm afraid you're correct, this is currently not exposed and you would
> have to hack around some things and/or use reflection.
>
> AbstractStreamOperator has a field latencyStats [1], which is what holds
> the metrics. This is being updated from method
> reportOrUpdateLatencyMarker [2].
>
> I hope that helps somewhat.
>
> Best,
> Aljoscha
>
> [1]
>
> https://github.com/apache/flink/blob/ab642cb616fb909893e2c650b0b4c2aa10407e6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L154
> [2]
>
> https://github.com/apache/flink/blob/ab642cb616fb909893e2c650b0b4c2aa10407e6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L702
>
> On 05.03.20 18:17, Felipe Gutierrez wrote:
> > Hi community,
> >
> > where from the Dlink code I can get the value of 99th percentile latency
> >
> (flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{operator_id="93352199ce18d8917f20fdf82cedb1b4",quantile="0.99"})?
> >
> > Probably I will have to hack the Flink source code to export those values
> > to my own operator. Nevertheless, it is what I need.
> >
> > Kind Regards,
> > Felipe
> >
> > *--*
> > *-- Felipe Gutierrez*
> >
> > *-- skype: felipe.o.gutierrez*
> > *--* *https://felipeogutierrez.blogspot.com
> > *
> >
>


Re: Flink 内存类型相关疑问

2020-03-10 Thread zhisheng
好的,清楚了,感谢

Xintong Song  于2020年3月10日周二 下午12:43写道:

> Hi Zhisheng,
>
> 首先,Flink 官方的内存配置文档 [1] 中明确表示了,不推荐同时配置
> process.size、flink.size、task.heap.size + managed.size 三者中的任意两种或以上。
>
> > Explicitly configuring both *total process memory* and *total Flink
> > memory* is not recommended. It may lead to deployment failures due to
> > potential memory configuration conflicts. Additional configuration of
> other
> > memory components also requires caution as it can produce further
> > configuration conflicts.
>
>
> 要理解为什么会有这样的建议,首先需要理解 Flink 的内存配置中有严格和非严格的差别。
>
>- 所有的 size/min/max 都是严格保证的,不管是用户显式配置还是用户没配但是有默认值,都会保证生效,或者因冲突而报错。
>- 所有的 fraction 都是非严格的,不保证最后算出的内存大小一定满足配置的 fraction。例如,如果根据 fraction 算出的
>network memory 超过了 max,那么会使用 max 值且不会报错。
>
> 目前所有内存类型中,大部分都是有严格的默认大小(size)或者默认范围(min/max)的,没有严格默认值的只有:
>
>- 总内存:process.size/flink.size 均没有默认值
>- Task Heap:task.heap.size 没有默认值
>- Managed:managed.size 没有默认值,managed.fraction 是非严格的
>
>
> 如果所有的内存部分都有配置指定了严格的内存大小或范围,那么总内存的大小或范围也就确定了,这时候如果再另外指定总内存的大小,就有可能产生冲突。这也就是为什么不建议用户同时配置以上几项的原因。
>
> 至于你提到的 task.heap.fraction,实际上在指定了总的 flink.size 和 Managed、Network 部分的
> fraction 之后,Heap 的大小就是确定的了(因为其他部分都是严格确定的大小),这个时候再指定 Heap 的 fraction
> 意义不大。同理,指定 process.size 的情况只是多考虑了 JVM-Overhead 的 fraction。其实
> task.heap.fraction 这个需求完全可以通过修改 network.fraction 和 managed.fraction 来实现,设置
> Network 和 Managed 用的内存少了,那么总内存中多出来的部分自然就留给 Heap 了。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html
>
> On Tue, Mar 10, 2020 at 11:40 AM zhisheng  wrote:
>
> > hi, xintong
> >
> > 刚才我在 YARN 上分别测试了四种情况,第一种是默认不修改内存的配置,直接运行作业(Per Job
> > 模式,下面的都是这种模式),作业能够启动起来,能正常运行,内存分配是 Flink 自己分配的;
> >
> > 第二种情况是配置文件指定 taskmanager.memory.managed.size
> > 和 taskmanager.memory.task.heap.size 参数的大小,分别是 1024m 和
> > 256m,作业也能够正常启动运行,截图如下:
> >
> > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-10-031628.png
> >
> > 第三种情况是启动的时候通过参数(-ytm 4096m)指定 TM
> > 整个的内存,这种情况我没有修改配置文件的内存分配,也是能够正常运行的,截图如下图所示,这些内存分配应该是根据计算公式算出来的大小
> >
> > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-10-031745.png
> >
> > 第四种情况是启动的时候通过参数(-ytm 4096m)指定 TM 整个的内存,这种情况我自己指定了
> > taskmanager.memory.managed.size 和 taskmanager.memory.task.heap.size
> > 参数的大小,发现启动作业报异常,
> >
> > Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> > Derived JVM Overhead size (2.236gb (2400539987 bytes)) is not in
> configured
> > JVM Overhead range [192.000mb (201326592 bytes), 1024.000mb (1073741824
> > bytes)]
> >
> > http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-10-032045.png
> >
> > 个人觉得原因可能是因为我指定了taskmanager.memory.managed.size
> > 和 taskmanager.memory.task.heap.size 参数的大小,Flink 就将 4096m
> > 减去指定的大小,然后将剩下来的分配给其他的几种内存,结果超出了 Overhead 的默认最大值,所以检查就返回异常了,不知道是不是这样的原理?
> >
> > 那么就有个问题了,因为在生产环境其实很难知道所有作业的运行状态,所以我们都是这边都是通过 参数(-ytm 4096m)指定 TM
> > 的内存,如果不指定是使用默认的,那么如果和上面情况四一样,既改了配置文件中的配置,又想通过参数(-ytm)去控制整个 TM 的内存,就有冲突了?
> >
> > 对于这种情况,我觉得是不是可以多加个参数控制,这个参数的作用是 heap 内存的 fraction,这样我只需要根据我配置的 fraction
> > 来分配总的内存值,因为我看 managed 和 network、overhead 这些都是有这个参数的
> >
> > eg:
> > taskmanager.memory.task.heap.fraction
> >
> > 不清楚这个建议是否得当,感谢!
> >
> > Xintong Song  于2020年3月10日周二 上午10:39写道:
> >
> > > >
> > > >
> 这里的“所配置的direct内存”,是指按照task.manager.network.memory.fraction计算得到的network
> > > > memory大小。我想是不是这部分内存按照memory segment全部预分配了,所有metrics里显示的是全部是被Used了?
> > >
> > >
> > > 是的,Network Buffer Pool 是在 TM 初始化时预申请好的。在所有内存类型中,只有 Network Memory
> > > 是预申请的。Managed Memory 1.9 以前是有一个配置可以预申请(默认不开),1.10 起不再支持预申请。
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Mar 10, 2020 at 10:30 AM pkuvisdudu 
> > wrote:
> > >
> > > > 多谢解答。
> > > > 关于“第二个是我看到metrics里directMemoryUsed总是和所配置的direct内存大小是一样的,不知道具体原因是啥?”
> > > >
> 这里的“所配置的direct内存”,是指按照task.manager.network.memory.fraction计算得到的network
> > > > memory大小。我想是不是这部分内存按照memory segment全部预分配了,所有metrics里显示的是全部是被Used了?
> > > >
> > > >
> > > >
> > > >
> > > > | |
> > > > 张江
> > > > |
> > > > |
> > > > 邮箱:zjkingdom2...@163.com
> > > > |
> > > >
> > > > 签名由 网易邮箱大师 定制
> > > >
> > > > 在2020年03月10日 10:16,zhisheng 写道:
> > > > hi,xintong,感谢耐心且专业的回答
> > > >
> > > > Xintong Song  于2020年3月10日周二 上午10:04写道:
> > > >
> > > > > Hi Zhisheng,
> > > > >
> > > > > 1、Non-Heap 那个 UI 上展示的是否是 MetaSpace 和 Overhead 加起来的值?
> > > > >
> > > > >
> > > > > 从物理含义上来说,Non-Heap 描述的内存开销是包含在 Metaspace + Overhead 里的。
> > > > >
> > > > >
> > > > > > 2、为什么本地起的 Standalone Flink,为啥 UI 上展示的 Heap 会超过设置的
> > > > > > taskmanager.memory.process.size 的值?
> > > > >
> > > > >
> > > > > 这主要是因为,我们只针对 Metaspace 设置了 JVM 的参数,对于其他 Overhead 并没有设置 JVM
> > 的参数,也并不是所有的
> > > > > Overhead 都有参数可以控制(比如栈空间)。
> > > > >
> > > > > Non-Heap Max 是 JVM 自己决定的,所以通常会比 Flink 配置的 Metaspace + Overhead
> > > > > 要大。可以这样理解,Flink 将整个 TM 的内存预算划分给了不同的用途,但是并不能严格保证各部分的内存都不超用,只能是 Best
> > > > > Effort。其中,Managed、Network、Metaspace 是严格限制的,Off-Heap、Overhead
> > > > > 是不能完全严格限制的,Heap 整体是严格限制的但是 Task/Framework 之间是非严格的。
> > > > >
> > > > > Non-Heap 这个 metric