Re: numRecordsOutPerSecond metric and side outputs

2021-01-04 Thread Arvid Heise
Hi Alexey,

side outputs should be counted in numRecordsOutPerSecond. However, there is
a bug that this is not happening for side-outputs in the middle of the
chain [1].

[1] https://issues.apache.org/jira/browse/FLINK-18808

On Tue, Dec 22, 2020 at 1:14 AM Alexey Trenikhun  wrote:

> Hello,
> Does numRecordsOutPerSecond metric takes into account number of records
> send to side output or it provides rate only for main output?
>
> Thanks,
> Alexey
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Replace (xx,'#','') has error

2021-01-04 Thread Arvid Heise
Hi,

without seeing the error and an example, it's hard to help.

Are you sure that xx is a string? You may need to convert it beforehand
with

CAST(xx AS VARCHAR)


On Tue, Jan 5, 2021 at 3:12 AM  wrote:

> What can i do?
>
> 发自我的iPhone
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: 回复: flink 1.12 Cancel Job内存未释放(问)

2021-01-04 Thread Yun Tang
Hi 徐州州

请查看一下checkpoint UI部分的overview,观察restored部分的是否为空,也就是没有从checkpoint恢复,同样可以观察job 
manager 部分日志,看是否从checkpoint resume。
如果没有从checkpoint/savepoint恢复,作业其实相当于是从头重新跑,除非作业有其他的外部访问,否则不应该有任何历史数据能看到。

祝好
唐云

From: 徐州州 <25977...@qq.com>
Sent: Tuesday, January 5, 2021 10:34
To: user-zh@flink.apache.org 
Subject: 回复: flink 1.12 Cancel Job内存未释放(问)

这是我完整的配置文件,并没有设置任何状态后端,和保存点,任务kill执行的命令是/opt/module/hadoop3.2.1/bin/yarn 
application -kill  jobid,启动命令执行的是,/opt/module/flink1.12/bin/flink run -d -m 
yarn-cluster -yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm 
App_Bs_Drainage_Launch_200105,我猜想会不会是因为队列的问题,我集群中只有一个queue队列。

-- 原始邮件 --
发件人: "user-zh" ;
发送时间: 2021年1月5日(星期二) 上午10:03
收件人: "user-zh@flink.apache.org";
主题: 回复: flink 1.12 Cancel Job内存未释放(问)

这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月5日 09:04,徐州州<25977...@qq.com> 写道:
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert
 into app_bs_drainage_place
|SELECT
| do.GrouporgName,
| du.Name,
| COUNT(DISTINCT dooi.Code) AS TotalSingular,
|md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND 
dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON 
dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name 
IN ('金装驼奶', '血糖仪')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND 
left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status 
< 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




--原始邮件--
发件人: "赵一旦"

Re: pause and resume flink stream job based on certain condition

2021-01-04 Thread Eleanore Jin
Hi Robert,

sorry for the late reply, I just did a quick test up, this seems working:
1. during the time checkpoints could expire, but once the thread is not
blocked, it will continue checkpointing
2. this guarantees the message ordering

Thanks a lot!
Eleanore

On Tue, Dec 15, 2020 at 10:42 PM Robert Metzger  wrote:

> What you can also do is rely on Flink's backpressure mechanism: If the map
> operator that validates the messages detects that the external system is
> down, it blocks until the system is up again.
> This effectively causes the whole streaming job to pause: the Kafka source
> won't read new messages.
>
> On Tue, Dec 15, 2020 at 3:07 AM Eleanore Jin 
> wrote:
>
>> Hi Guowei and Arvid,
>>
>> Thanks for the suggestion. I wonder if it makes sense and possible that
>> the operator will produce a side output message telling the source to
>> 'pause', and the same side output as the side input to the source, based on
>> which, the source would pause and resume?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Sun, Nov 29, 2020 at 11:33 PM Arvid Heise  wrote:
>>
>>> Hi Eleanore,
>>>
>>> if the external system is down, you could simply fail the job after a
>>> given timeout (for example, using asyncIO). Then the job would restart
>>> using the restarting policies.
>>>
>>> If your state is rather small (and thus recovery time okay), you would
>>> pretty much get your desired behavior. The job would stop to make progress
>>> until eventually the external system is responding again.
>>>
>>> On Mon, Nov 30, 2020 at 7:39 AM Guowei Ma  wrote:
>>>
 Hi, Eleanore

 1. AFAIK I think only the job could "pause" itself.  For example the
 "query" external system could pause when the external system is down.
 2. Maybe you could try the "iterate" and send the failed message back
 to retry if you use the DataStream api.

 Best,
 Guowei


 On Mon, Nov 30, 2020 at 1:01 PM Eleanore Jin 
 wrote:

> Hi experts,
>
> Here is my use case, it's a flink stateless streaming job for message
> validation.
> 1. read from a kafka topic
> 2. perform validation of message, which requires query external system
>2a. the metadata from the external system will be cached in
> memory for 15minutes
>2b. there is another stream that will send updates to update
> the cache if metadata changed within 15 minutes
> 3. if message is valid, publish to valid topic
> 4. if message is invalid, publish to error topic
> 5. if the external system is down, the message is marked as invalid
> with different error code, and published to the same error topic.
>
> Ask:
> For those messages that failed due to external system failures, it
> requires manual replay of those messages.
>
> Is there a way to pause the job if there is an external system
> failure, and resume once the external system is online?
>
> Or are there any other suggestions to allow auto retry such error?
>
> Thanks a lot!
> Eleanore
>

>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>


FlinkSQL 1.10.0 where条件包含关键字列名的过滤条件不能使用=判断

2021-01-04 Thread Robin Zhang


测试代码如下:

create view sink_test as
select
id
,type
,student_id
,kefu_id
,action_time
,action_user
,distribute_status
,unbind_type
,`comment`
,time_created
,pull_from
from distribute_new_log
where `comment` ='娃娃鱼';

print table sink_test;

当使用关键字列做过滤条件时,能过滤出符合的数据,但是关键字列comment的值输出为unicode码(\u5B9A\u5411\u5206\u914DTMK\u5C0F\u7EC4),并不是中文。正常来说SQL中字符串判断用=是支持的吧。测试下来发现使用`comment`
like '娃娃鱼',输出就没问题,写如结果表数据显示正常,不知道这是不是10版本的sql限制?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-01-04 Thread fuzs
退订

Re: Flink SQL>查询的hive表数据全部为NULL

2021-01-04 Thread Rui Li
Hello,

Flink和Hive版本是什么呢?ORC表的数据是否有过schema evolution?

On Mon, Jan 4, 2021 at 9:19 AM Jacob <17691150...@163.com> wrote:

> Dear All,
>
> Flink SQL>select * from table1;
>
>
> 在Flink客户端查询hive数据,结果每个字段全为NULL,但数据条数是对的,select
> count是正确的,查具体数据就是NULL,不知何故?同样的查询在hive客户端是可以查到数据的
>
>
> hive表时orc文件load的数据。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


?????? apache flink

2021-01-04 Thread Waldeinsamkeit.
??




----
??: 
   "user-zh"

<573693...@qq.com;
:2021??1??5??(??) 11:19
??:"user-zh"

关于flink rest api的监控接口问题

2021-01-04 Thread 赵一旦
当前vertex的结点监控,有个获取全部指标的接口,和基于get参数逗号分割获取指标值的接口。

现在问题是我的采集脚本在获取监控值时候,因为是get导致超长,于是我5个5个的获取,但这导致我每30s一次采集,每次采集上百次请求,耗时达到几十秒。

是否可以搞个post接口;或者在metrics那个获取全部metric指标id的接口中就直接返回全部value呢?


Re: apache flink

2021-01-04 Thread kcz
我的理解,flink是一个任务执行引擎,你需要的功能应该是任务调度器吧,比如airflow等。





-- Original --
From: Waldeinsamkeit. <1214316...@qq.com
Date: Tue,Jan 5,2021 11:13 AM
To: user-zh 

apache flink

2021-01-04 Thread Waldeinsamkeit.
??   
??flinkflink??
   ??

?????? flink 1.12 Cancel Job??????????(??)

2021-01-04 Thread ??????
kill/opt/module/hadoop3.2.1/bin/yarn
 application -kill 
jobid/opt/module/flink1.12/bin/flink run -d -m yarn-cluster 
-yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm 
App_Bs_Drainage_Launch_200105queue??


----
??: 
   "user-zh"



Replace (xx,'#','') has error

2021-01-04 Thread abc15606
What can i do?

发自我的iPhone


?????? flink 1.12 Cancel Job??????????(??)

2021-01-04 Thread ????
job


| |

|
|
liuha...@163.com
|
??
??2021??1??5?? 09:04<25977...@qq.com> ??
??flink-sqljarMemoryStateBackend??23:57-killjob00:30azkaban00:30??kill??NodeManager??00:301000??900??|insert
 into app_bs_drainage_place
|SELECT
| do.GrouporgName,
| du.Name,
| COUNT(DISTINCT dooi.Code) AS TotalSingular,
|md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND 
dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON 
dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name 
IN ('', '??')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND 
left(do.GrouporgName, 2) = ''
| WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status 
< 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




----
??: "??"

Re: pyflink-udaf

2021-01-04 Thread Xingbo Huang
Hi,

我这边没有看到你提供的附件。关于这个报错,我在你上封邮件回复你了,你可以看下是不是你的weighted_avg
没有进行注册(可以通过create_temporary_system_function或者register_function来注册,这样就可以通过字符串的方式进行使用)。当然你要是直接使用DSL的方式(文档中的例子),是不用你注册的。

Best,
Xingbo

hepeitan  于2021年1月4日周一 下午8:48写道:

> 您好:
>   我们项目组计划使用pyflink的udaf对数据进行聚合处理,拜读社区文章得知,对于批处理方式或窗口式聚合,建议使用Vectorized
> Aggregate Functions
> 
> 。但此case提供的代码不完全,不是完整的case,
> 自己编写测试程序报错:"org.apache.flink.table.api.ValidationException: Undefined
> function: weighted_avg",附件为测试代码
>   请问能否提供一个完整的pyflink的udaf对数据进行聚合处理示例,多谢!!!
>
>
>
>


Comparing Flink vs Materialize

2021-01-04 Thread Dan Hill
Has anyone compared Flink with Materialize?  A friend recommended me switch
to Materialize.

In one of their blog posts, it says that Flink splits operators across CPUs
(instead of splitting partitions across CPUs).  Is this true?  Is it
configurable?

https://materialize.com/blog-rocksdb/


Kafka SQL Connector Behavior (v1.11)

2021-01-04 Thread Aeden Jameson
Based on these docs,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html,
the default partitioning behavior is not quite clear to me.
If no value for sink-partitioner is given, is the default behavior
just that of the native Kafka library? (with key use murmur2 , without
key round robin)

Thank you,
Aeden


Re: Flink sink never executes

2021-01-04 Thread Ben Beasley
Yes, it did. Thanks for checking, Kostas. Also, thanks again for helping me 
with the other issue. What a great community Flink has.

From: Kostas Kloudas 
Date: Monday, January 4, 2021 at 6:21 AM
To: Ben Beasley 
Cc: user@flink.apache.org 
Subject: Re: Flink sink never executes
Hi Ben,

Sorry for the late reply but I guess that your question was answered
in StackOverflow, right?
Did that answer solve your problem?

Cheers,
Kostas

On Mon, Dec 21, 2020 at 9:09 AM Ben Beasley  wrote:
>
> First off I want to thank the folks in this email list for their help thus 
> far.
>
>
>
> I’m facing another strange issue where if I add a window to my stream, the 
> sink no longer executes. However the sink executes without the windowing. I 
> described my problem on stackoverflow so that the code is easy to read.
>
>
>
> I wonder if anyone can help me once more, I believe the solution could be 
> simple for someone familiar with the code. I believe I’ve followed the 
> tutorials and articles on the flink website correctly.


Batch with Flink Steraming API version 1.12.0

2021-01-04 Thread Robert Cullen
I have a Kafka source that I would like to run a batch job on.  Since
Version 1.12.0 is now soft deprecating the DataSet API in favor of the
DataStream API, can someone show me an example of this? (Using DataStream)

thanks
-- 
Robert Cullen
240-475-4490


Fwd: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-04 Thread Dongwon Kim
Any advice would be appreciated :)

Thanks,

Dongwon

-- Forwarded message -
From: Dongwon Kim 
Date: Mon, Dec 14, 2020 at 11:27 PM
Subject: How to gracefully avoid "Generic types have been disabled in the
ExecutionConfig and type java.util.List is treated as a generic type"?
To: user 


Hi,

The following program compiles and runs w/o exceptions:

> public class Test {
>
>   public static class A {
> private int n;
>
> public A() { }
> public int getN() {  return n;  }
> public void setN(int n) {  this.n = n;  }
>   }
>
>   public static class B {
> private List lst;
>
> public B() { }
> public List getLst() {  return lst;  }
> public void setLst(List lst) {  this.lst = lst;  }
>   }
>
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
>
> env.fromElements(new B())
>   .print();
>
> env.execute();
>   }
> }
>

When I add the following line,

> env.getConfig().disableGenericTypes();

then the program shows me an exception:

> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and type
> java.util.List is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
> at Test.main(Test.java:29)


To avoid this exception, I found that I have to declare a type factory like:

>   public static class BTypeFactory extends TypeInfoFactory {
> @Override
> public TypeInformation createTypeInfo(Type t, Map TypeInformation> genericParameters) {
>   return Types.POJO(
> B.class,
> ImmutableMap.>builder()
>   .put("lst", Types.LIST(Types.POJO(A.class)))
> .build()
>   );
> }
>   }

and give it to class B as follows:

>   @TypeInfo(BTypeFactory.class)
>   public static class B {


Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member
variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon


Re: Flink Stateful Function: The program's entry point class not found in the jar file

2021-01-04 Thread Igal Shilman
Hi Le,

Looking at your pom.xml, you are pointing to the wrong main method here:
https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/pom.xml#L161

You need to change it to your Example class, this should work.


On Tue, Dec 29, 2020 at 5:06 AM Le Xu  wrote:

> Hi Igal:
>
> Thanks for pointing that out. I was able to add the dependency in and
> submit the job. For statefun-greeter example [1] I was able to submit the
> job to the cluster . But when I try out the statefun-data-stream example
> [2] I got the complaints saying that "There are no ingress defined" (I'm
> adding the full trace to the end of the email). (I'm also adding my pom to
> [3] and source file to [4].) From the example it appears the job uses the
> Datastream as ingress [5] so ideally the job should be able to receive
> events as trigger periodically (the job works fine with local environment).
> I also came across this issue [6] but I don't think it helps solving my
> problem. By any chance you'd know what's going on?
>
>
> [1]
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
> [2]
> https://github.com/flint-stone/flink-statefun/blob/3aba506242300d69e95ef339afeac8e561dc7a2d/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L82
> [3]
> https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/pom.xml
> [4]
> https://github.com/flint-stone/flink-statefun/blob/lx-base/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
> [5]
> https://github.com/flint-stone/flink-statefun/blob/3aba506242300d69e95ef339afeac8e561dc7a2d/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L82
> [6]
> https://stackoverflow.com/questions/61578082/flink-statefun-co-located-functions-communication
>
> Error Trace:
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: There are no ingress defined.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.lang.IllegalStateException: There are no ingress defined.
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:76)
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:52)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 8 more
>
>
> Thanks for the help!
>
> Le
>
>
> On Mon, Dec 28, 2020 at 6:24 AM Igal Shilman  wrote:
>
>> Hi Le,
>> Indeed you have added the dependency correctly. But the resulting
>> artifact doesn't contain the dependencies. You need to create a jar with
>> dependencies ( via [1] or [2])
>> Take a look at [3] for a usage example of the maven shade plugin.
>>
>> I hope this helps,
>> Igal.
>>
>> [1] https://maven.apache.org/plugins/maven-assembly-plugin/usage.html
>> [2] https://maven.apache.org/plugins/maven-shade-plugin/
>> [3]
>> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-distribution/pom.xml#L126,L199
>>
>> On Sat, Dec 26, 2020 at 11:52 PM Le Xu  wrote:
>>
>>> Thanks Igal! I might be missing something here. I did place
>>> statefun-flink-distribution as part of my dependency in the pom (see
>>> line 46 at [1]).  Is there a correct way to include the jar? I'm having the
>>> same problem across many examples I'm running.
>>>
>>> [1]
>>> https://gist.github.com/flint-stone/059f00832d8b99af433a446771f4f740#file-pom-xml-L64
>>>
>>> Thanks!
>>>
>>> Le
>>>
>>> On Sat, Dec 26, 2020 at 2:23 PM 

Re: Visualizing Flink Statefun's "Logical Co-location, Physical Separation" Properties

2021-01-04 Thread Igal Shilman
Hi Le,
Let me try to answer to your multiple questions, one by one:


> I'm trying to understand the internal mechanism used by Flink Statefun to
> dispatch functions to Flink cluster. In particular, I was trying to find a
> good example demonstrating Statefun's "Logical Co-location, Physical
> Separation" properties (as pointed out by [1]).
>

I'm not sure that I understand what dispatch functions to the Flink cluster
mean here, but I will try to give you a general description of how things
work with StateFun, and please follow up
with any clarifications :-)

In general, StateFun is a very specific Flink streaming job, and as such it
will be running on a Flink cluster. Now, a remote function is a function
that runs in a different process
that executes (for now) an HTTP server and runs the StateFun SDK. These
processes can be located at the same machine as the Flink's TaskManagers
and communicate via a unix domain socket, or they can be at a different
machine, or they can even be deployed behind a load balancer, and
autoscaled up and down on demand.
Now, as long as StateFun knows how to translate a function logical address
to an HTTP endpoint that serves it, StateFun can dispatch function calls to
these remote function processes.
By logical co-location, physical separation: a Flink worker that executes
the StateFun job, is responsible for the state and messaging of a specific
key (address) but the function itself can be running at a different
physical process.
A good example of this kind of deployment you can find Gordon's talk [1],
that demonstrates deploying the remote functions on AWS lambda.


My understanding based on the doc was that there are three modes to deploy
> statefun to Flink cluster ranging from remote functions to embedded
> functions (decreasing storage locality and increasing efficiency).
> Therefore, in the scenario of remote functions, functions are deployed with
> its state and message independent from flink processes. And function should
> be able to be executed in any Flink process as if it is a stateless
> application.
>

Remote functions are indeed "effectively stateless" and state is being
provided as part of an invocation request. But the state is managed in a
fault tolerant way by Flink.


> I have tried out a couple of examples from the statefun but judging by
> allocation result the subtask of the job seems to bind statically with each
> task slot in the Flink cluster (I'm assuming the example such as DataStream
> uses embedded function instead?).
>

You are correct, the StateFun job has a fixed topology independent of the
number of functions or function types. Therefore you can have many
different function types and many billions of function instances.
A single FunctionDispatcher operator, will handle transparently the
multiplexing of different function types and instances behind the scenes.

I hope that clarifies a bit.

Igal.


[1] https://www.youtube.com/watch?v=tuSylBadNSo


On Tue, Dec 29, 2020 at 10:58 PM Le Xu  wrote:

> Hello!
>
> I'm trying to understand the internal mechanism used by Flink Statefun to
> dispatch functions to Flink cluster. In particular, I was trying to find a
> good example demonstrating Statefun's "Logical Co-location, Physical
> Separation" properties (as pointed out by [1]).
>
> My understanding based on the doc was that there are three modes to deploy
> statefun to Flink cluster ranging from remote functions to embedded
> functions (decreasing storage locality and increasing efficiency).
> Therefore, in the scenario of remote functions, functions are deployed with
> its state and message independent from flink processes. And function should
> be able to be executed in any Flink process as if it is a stateless
> application. I have tried out a couple of examples from the statefun but
> judging by allocation result the subtask of the job seems to bind
> statically with each task slot in the Flink cluster (I'm assuming the
> example such as DataStream uses embedded function instead?).
>
> I also came across this tutorial [2] demonstrating the usage of remote
> function. The README indicates [3] that "Since the functions are
> stateless, and running in their own container, we can redeploy and rescale
> it independently of the rest of the infrastructure." which seems to
> indicate that the function performs scaling manually by the user that could
> occupy arbitrary resources (e.g., task slots) from the Flink cluster on
> demand. But I wasn't sure how to explicitly specify the amount of
> parallelism for each function dynamically.
> Is there a good example to visualize statefun "physical separation"
> behavior by forcing the same function to be invoked at different task slots
> / machines (either on-demand or automatically)?
>
> Any help will be appreciated!
>
> Thanks!
>
> Le
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/distributed_architecture.html#remote-functions
> [2] 

Re: How to register TableSinks

2021-01-04 Thread Patrick.Eifler
Hey, Thanks Dawid,

One more question: Does the StatementSet API supposed to work with the old sink 
interface?
I get the following error when I’m using it with the deprecated 
registerTableSink method:

The main method caused an error: requirement failed: operations should not be 
empty

Thanks!

Patrick
--
Patrick Eifler

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure
Sony Interactive Entertainment LLC

Wilhelmstraße 118, 10963 Berlin

Germany

E: patrick.eif...@sony.com

From: Dawid Wysakowicz 
Date: Monday, 4. January 2021 at 15:50
To: , 
Subject: Re: How to register TableSinks


Hi Patrick.

Happy New Year to you too ;)

The method you referring was deprecated along with the TableSink whatsoever in 
favour of a much improved and feature rich new Source & Sink API. You can find 
an extensive documentation on this new API here[1].

Therefore if you use the old TableSink interface you must stick with the 
deprecated method.

Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
On 04/01/2021 14:53, patrick.eif...@sony.com 
wrote:
Hi and Happy New Year,

I’m currently trying to remove deprecations to prepare for the upgrade to Flink 
1.12. currently running on 1.11.

Specifically I need to update our code that registers table sinks into the 
StreamTableEnvironment. I’m maintaining jobs that use DataStreams with multiple 
sinks. Now I want to use the StatementSet to benefit from its DAG for multiple 
sinks.

So far I added the code to add the sinks into the StatementSet:

statementSet.addInsert(sinkName,.table)

and to execute the StatementSet:

statementSet.execute()

For this to work I need to register the sinks. I used to do that with the (now 
deprecated) function on the StreamTableEnvironment:

tableEnv.registerTableSink(
sinkName,
fieldNames,
fieldTypes,
tableSink
)

My Question is how to register sinks to be discovered by the statement set? 
What is the proper replacement for the function registerTableSink?

executeSql(ddl) as suggested, does not apply to this use case. Did not find 
anything in the documentation either: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query

When running the job I’m getting the error, that the sink could not be found in 
the catalog. Which means I have to add the sink into the catalog, but how?

Which function should be used for registering a table sink into the table 
environments catalog?


Thanks!

Kind Regards,

Patrick
--
Patrick Eifler

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure
Sony Interactive Entertainment LLC

Wilhelmstraße 118, 10963 Berlin

Germany

E: patrick.eif...@sony.com


Re: Is chk-$id/_metadata created regardless of enabling externalized checkpoints?

2021-01-04 Thread Yun Gao
Hi Dongwon,

   Happy new year! One meta file would be stored on top of HDFS even if 
external-checkpoint is not enabled. If external checkpoint is not enabled, 
flink would delete all the checkpoints on exit, and if external checkpoint is 
enabled, the checkpoints would be kept on cancel or fail cases, according to 
the settings. Thus for the second issue, I think it would be yes.

Best,
 Yun


 --Original Mail --
Sender:Dongwon Kim 
Send Date:Mon Jan 4 19:16:39 2021
Recipients:user 
Subject:Is chk-$id/_metadata created regardless of enabling externalized 
checkpoints?

Hi,

First of all, happy new year!
It can be a very basic question but I have something to clarify in my head.

my flink-conf.yaml is as follows (note that I didn't specify the value of 
"execution-checkpointing-externalized-checkpoint-retention [1]"):
#...
execution.checkpointing.interval: 20min
execution.checkpointing.min-pause: 1min

state.backend: rocksdb
state.backend.incremental: true

state.checkpoints.dir: hdfs:///flink-jobs/ckpts
state.checkpoints.num-retained: 10

state.savepoints.dir: hdfs:///flink-jobs/svpts
#...

And the checkpoint configuration is shown as follows in Web UI (note that 
"Persist Checkpoints Externally" is "Disabled" in the final row):


According to [2],
externalized checkpoints: You can configure periodic checkpoints to be 
persisted externally. Externalized checkpoints write their meta data out to 
persistent storage and are not automatically cleaned up when the job fails. 
This way, you will have a checkpoint around to resume from if your job fails. 
There are more details in the deployment notes on externalized checkpoints.
So I've thought the metadata of a checkpoint is only on JobManager's memory and 
not stored on HDFS unless 
"execution-checkpointing-externalized-checkpoint-retention" is set.

However, even without setting the value, every checkpoint already contains its 
own metadata:
[user@devflink conf]$ hdfs dfs -ls 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/*
Found 1 items
-rw-r--r--  3 user hdfs  163281 2021-01-04 14:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-945/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  163281 2021-01-04 14:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-946/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  163157 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-947/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  156684 2021-01-04 15:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-948/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  147280 2021-01-04 15:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-949/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  147280 2021-01-04 16:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-950/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  162937 2021-01-04 16:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-951/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  175089 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-952/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  173289 2021-01-04 17:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-953/_metadata
Found 1 items
-rw-r--r--  3 user hdfs  153951 2021-01-04 17:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/chk-954/_metadata
Found 21 items
-rw-r--r--  3 user hdfs 78748 2021-01-04 14:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/05d76f4e-3d9c-420c-8b87-077fc9880d9a
-rw-r--r--  3 user hdfs 23905 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/0b9d9323-9f10-4fc2-8fcc-a9326448b07c
-rw-r--r--  3 user hdfs 81082 2021-01-04 16:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/0f6779d0-3a2e-4a94-be9b-d9d6710a7ea0
-rw-r--r--  3 user hdfs 23905 2021-01-04 16:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/107b3b74-634a-462c-bf40-1d4886117aa9
-rw-r--r--  3 user hdfs 78748 2021-01-04 14:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/18a538c6-d40e-48c0-a965-d65be407a124
-rw-r--r--  3 user hdfs 83550 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/24ed9c4a-0b8e-45d4-95b8-64547cb9c541
-rw-r--r--  3 user hdfs 23905 2021-01-04 17:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/35ee9665-7c1f-4407-beb5-fbb312d84907
-rw-r--r--  3 user hdfs 47997 2021-01-04 11:25 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/36363172-c401-4d60-a970-cfb2b3cbf058
-rw-r--r--  3 user hdfs 81082 2021-01-04 15:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/43aecc8c-145f-43ba-81a8-b0ce2c3498f4
-rw-r--r--  3 user hdfs 79898 2021-01-04 15:05 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/5743f278-fc50-4c4a-b14e-89bfdb2139fa
-rw-r--r--  3 user hdfs 23905 2021-01-04 16:45 
/flink-jobs/ckpts/76fc265c44ef44ae343ab15868155de6/shared/67e16688-c48c-409b-acac-e7091a84d548
-rw-r--r--  3 user hdfs 23905 

Re: How to register TableSinks

2021-01-04 Thread Dawid Wysakowicz
Hi Patrick.

Happy New Year to you too ;)

The method you referring was deprecated along with the TableSink
whatsoever in favour of a much improved and feature rich new Source &
Sink API. You can find an extensive documentation on this new API here[1].

Therefore if you use the old TableSink interface you must stick with the
deprecated method.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html

On 04/01/2021 14:53, patrick.eif...@sony.com wrote:
>
> Hi and Happy New Year,
>
>  
>
> I’m currently trying to remove deprecations to prepare for the upgrade
> to Flink 1.12. currently running on 1.11.
>
>  
>
> Specifically I need to update our code that registers table sinks into
> the StreamTableEnvironment. I’m maintaining jobs that use DataStreams
> with multiple sinks. Now I want to use the StatementSet to benefit
> from its DAG for multiple sinks.
>
>  
>
> So far I added the code to add the sinks into the StatementSet:
>
>  
>
> *statementSet.addInsert(sinkName,.table)*
>
>  
>
> and to execute the StatementSet:
>
>  
>
> *statementSet.execute()*
>
>  
>
> For this to work I need to register the sinks. I used to do that with
> the (now deprecated) function on the StreamTableEnvironment:
>
>  
>
> *tableEnv.registerTableSink(*
>
> *    sinkName,*
>
> *    fieldNames,*
>
> *    fieldTypes,*
>
> *    tableSink*
>
> *)*
>
>  
>
> My Question is how to register sinks to be discovered by the statement
> set? What is the proper replacement for the function *registerTableSink*?
>
>  
>
> *executeSql(ddl)*as suggested, does not apply to this use case. Did
> not find anything in the documentation either:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query
>
>  
>
> When running the job I’m getting the error, that the sink could not be
> found in the catalog. Which means I have to add the sink into the
> catalog, but how?
>
>  
>
> Which function should be used for registering a table sink into the
> table environments catalog?
>
> Thanks!
>
>  
>
> Kind Regards,
>
>  
>
> Patrick
>
> -- 
>
> Patrick Eifler
>
>  
>
> Senior Software Engineer (BI)
>
> Cloud Gaming Engineering & Infrastructure 
> Sony Interactive Entertainment LLC 
>
> Wilhelmstraße 118, 10963 Berlin
>
>
> Germany
>
> E: patrick.eif...@sony.com
>


signature.asc
Description: OpenPGP digital signature


Re: Flink sink never executes

2021-01-04 Thread Kostas Kloudas
Hi Ben,

Sorry for the late reply but I guess that your question was answered
in StackOverflow, right?
Did that answer solve your problem?

Cheers,
Kostas

On Mon, Dec 21, 2020 at 9:09 AM Ben Beasley  wrote:
>
> First off I want to thank the folks in this email list for their help thus 
> far.
>
>
>
> I’m facing another strange issue where if I add a window to my stream, the 
> sink no longer executes. However the sink executes without the windowing. I 
> described my problem on stackoverflow so that the code is easy to read.
>
>
>
> I wonder if anyone can help me once more, I believe the solution could be 
> simple for someone familiar with the code. I believe I’ve followed the 
> tutorials and articles on the flink website correctly.


Re: flink 1.12 Cancel Job内存未释放(问)

2021-01-04 Thread 赵一旦
具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977...@qq.com> 于2021年1月4日周一 上午8:45写道:

> 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
> application -kill
> application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
>
>
>
> --原始邮件--
> 发件人: "赵一旦" 发送时间: 2020年12月29日(星期二) 晚上9:35
> 收件人: "user-zh" 主题: Re: flink 1.12 Cancel Job内存未释放(问)
>
>
>
> 不可以吧。任务是任务。taskManager是taskManager。 taskManager是提前启动好的一个进程,任务提交的时候会由
> taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
> 或者考虑yarn方式,per-job模式啥的。
>
> 徐州州 <25977...@qq.com 于2020年12月29日周二 上午9:00写道:
>
>  请教一下,我flink
> 
> sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?


How to register TableSinks

2021-01-04 Thread Patrick.Eifler
Hi and Happy New Year,

I’m currently trying to remove deprecations to prepare for the upgrade to Flink 
1.12. currently running on 1.11.

Specifically I need to update our code that registers table sinks into the 
StreamTableEnvironment. I’m maintaining jobs that use DataStreams with multiple 
sinks. Now I want to use the StatementSet to benefit from its DAG for multiple 
sinks.

So far I added the code to add the sinks into the StatementSet:

statementSet.addInsert(sinkName,.table)

and to execute the StatementSet:

statementSet.execute()

For this to work I need to register the sinks. I used to do that with the (now 
deprecated) function on the StreamTableEnvironment:

tableEnv.registerTableSink(
sinkName,
fieldNames,
fieldTypes,
tableSink
)

My Question is how to register sinks to be discovered by the statement set? 
What is the proper replacement for the function registerTableSink?

executeSql(ddl) as suggested, does not apply to this use case. Did not find 
anything in the documentation either: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query

When running the job I’m getting the error, that the sink could not be found in 
the catalog. Which means I have to add the sink into the catalog, but how?

Which function should be used for registering a table sink into the table 
environments catalog?

Thanks!

Kind Regards,

Patrick
--
Patrick Eifler

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure
Sony Interactive Entertainment LLC

Wilhelmstraße 118, 10963 Berlin

Germany

E: patrick.eif...@sony.com


Re: 如何优雅的开发Flink SQL作业

2021-01-04 Thread 赵一旦
1 kafka table和group id是啥意思。group id随意写一个就可以了。
2 本身就可以复用。
3 听不懂表达啥。


HideOnBushKi <1405977...@qq.com> 于2021年1月4日周一 下午3:43写道:

> 大佬们好,现在业务需求多,生产作业开始变得繁重,请教3个生产业务场景,主要是想看下各位有什么优雅的思路
>
> 1.kakfa table和group_ID应该怎么去维护呢?一个业务一张表吗?
> 2.如何做到复用表的效果?
> 3.如果新增一个业务需求,用到之前的一张kafka table,似乎要在一个程序里。执行多次 executeSql("sql
> 1")才不会乱序,但是这样的话,程序的耦合度会很高,请问该怎么优雅的处理这些场景呢?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink on k8s 1.11.3版本,使用 hdfs 的任务, taskmanager 无法启动的问题

2021-01-04 Thread Yang Wang
1.11版本以后可以直接在Flink Client的机器上export HADOOP_CONF_DIR
然后运行flink run-application或者kubernetes_session.sh启动Flink任务,这样Flink
Client会自动通过ConfigMap将Hadoop配置ship到JobManager和TaskManager pod
并且加到classpath的

Best,
Yang

龙逸尘  于2021年1月4日周一 下午4:39写道:

> 各位下午好,目前我正在使用 Flink on k8s application-mode 构建构建一个消费 kafka 写入 hive 的
> demo,使用 hdfs 作为 statebackend,遇到了一些问题,希望能得到帮助。这里我描述一下问题与 debug 的过程。
>
> Dockerfile 如下
>
> FROM flink:1.11.3-scala_2.11
> RUN mkdir -p $FLINK_HOME/usrlib
> RUN mkdir -p /opt/hadoop/conf
> COPY flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> COPY flink-on-k8s-1.0-SNAPSHOT.jar
> $FLINK_HOME/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
> COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
> ENV HADOOP_CONF_DIR /opt/hadoop/conf
> ENV YARN_CONF_DIR /opt/hadoop/conf
> COPY yarn-site.xml /opt/hadoop/conf/yarn-site.xml
> COPY hdfs-site.xml /opt/hadoop/conf/hdfs-site.xml
> COPY core-site.xml /opt/hadoop/conf/core-site.xml
>
> 启动命令如下
>
> flink-1.11.3/bin/flink run-application -p 1 -t kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster-demo7-4
> -Dkubernetes.jobmanager.service-account=flink
> -Dtaskmanager.memory.process.size=1024m   -Dkubernetes.taskmanager.cpu=1
> -Dtaskmanager.numberOfTaskSlots=1
> -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
> %jvmopts% %logging% %class% %args%"
> -Dkubernetes.container.image=flink:demo7-4
> -Dkubernetes.rest-service.exposed.type=NodePort
> local:///opt/flink/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
>
> flink-on-k8s-1.0-SNAPSHOT.jar 这个 jar 包仅仅是消费 kafka,使用 hdfs 作为 statebackend
> 记录状态。
>
> 一开始尝试只把 yarn-site.xml 等三个文件放在 usrlib 目录下,JobManager 无法启动,报错是
> UnknownHost。参考邮件列表中的信息,设置HADOOP_CONF_DIR之后,JobManager 成功启动且没有报错日志,但是
> TaskManager 一直处于 ContainerCreating 状态,7-8 分钟后 deployment 自动退出。使用 describe
> pod 获取 tm 报错信息如下:
>
> Events:
>   Type Reason   Age   From
>  Message
>    --     
>  ---
>   Normal   Scheduled default-scheduler
> Successfully assigned
> default/my-first-application-cluster-demo7-4-taskmanager-1-1 to
> k8s-node0002
>   Warning  FailedMount  37s (x10 over 4m46s)  kubelet,
> k8s-ci-dcn-bigdata-node0002  MountVolume.SetUp failed for volume
> "hadoop-config-volume" : configmap
> "hadoop-config-my-first-application-cluster-demo7-4" not found
>   Warning  FailedMount  29s (x2 over 2m44s)   kubelet, k8s-node0002  Unable
> to attach or mount volumes: unmounted volumes=[hadoop-config-volume],
> unattached volumes=[hadoop-config-volume flink-config-volume
> default-token-fhkhf]: timed out waiting for the condition
>
> 请问我是否配置有误,还是需要别的配置来启用 hdfs。
> 期待您的回复~
>
> ---
> Best Regards!
>
> Yichen
>


Re: flink on k8s application mode指定运行作业jar包路径问题

2021-01-04 Thread Yang Wang
目前native的方式只能支持local,也就是用户jar需要打到镜像里面,暂时不能支持hdfs或oss

是可以通过init container来下载,目前pod template[1]这个功能还没有支持,你可以跟进进度

[1]. https://issues.apache.org/jira/browse/FLINK-15656

Best,
Yang

陈帅  于2021年1月2日周六 下午8:08写道:

> 官网给的示例命令如下
> ./bin/flink run-application -p 8 -t kubernetes-application \
>
>   -Dkubernetes.cluster-id=flink-k8s-application-cluster \
>
>   -Dtaskmanager.memory.process.size=4096m \
>
>   -Dkubernetes.taskmanager.cpu=2 \
>
>   -Dtaskmanager.numberOfTaskSlots=4 \
>
>   -Dkubernetes.container.image=flink:latest \
>
>   local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
>
>
>
> 这最后一行参数指定了作业jar包路径,请问只支持local模式吗?那这样的话就只能将作业打包进镜像了吧?有没有可能访问外部文件系统,例如hdfs或oss地址?
> 如果当前暂时不支持的话,还有别的workaround办法吗?我听说有一个init container,具体要如何操作呢?


Re: flink on k8s作业监控问题

2021-01-04 Thread Yang Wang
可以通过配置PrometheusPushGateway[1]将Metrics导入到Prometheus,然后对接Grafana进行查看

日志除了sidecar,也可以通过log4j2自定义appender的方式,来直接将Log写到分布式存储(阿里云Oss,ElasticSearch)等

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/metric_reporters.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

Best,
Yang

陈帅  于2021年1月2日周六 下午7:54写道:

> 请问运行在k8s per
> job上的flink作业要如何正确监控?一方面通过sidecar的方式收集日志,另一方面要怎么收集那些flink作业metrics?
> 这方面有什么资料参考吗?


pyflink-udaf

2021-01-04 Thread hepeitan
您好:

  我们项目组计划使用pyflink的udaf对数据进行聚合处理,拜读社区文章得知,对于批处理方式或窗口式聚合,建议使用Vectorized 
Aggregate Functions。但此case提供的代码不完全,不是完整的case,
自己编写测试程序报错:"org.apache.flink.table.api.ValidationException: Undefined function: 
weighted_avg",附件为测试代码
  请问能否提供一个完整的pyflink的udaf对数据进行聚合处理示例,多谢!!!

Re: Re: flink 1.12.0 kubernetes-session部署问题

2021-01-04 Thread Yang Wang
native方式默认使用的是LoadBalancer的方式来暴露,所以会打印出来一个你无法访问的地址
你可以加一个-Dkubernetes.rest-service.exposed.type=NodePort的方式来使用NodePort来暴露
这样Flink Client端打印出来的地址就是正确的了

另外你可以可以使用minikube ip来查看ip地址,同时用kubectl get svc获取你创建的Flink cluster
svc的NodePort,拼起来就可以


至于你说的NoResourceAvailableException,你可以看下是不是TaskManager的Pod已经创建出来了,但是pending状态
如果是,那就是你minikube资源不够了,可以把minikube资源调大或者把JobManager、TaskManager的Pod资源调小
如果不是,你可以把完整的JobManager日志发一下,这样方便查问题


Best,
Yang

陈帅  于2021年1月2日周六 上午10:43写道:

> 环境:MacBook Pro 单机安装了 minkube v1.15.1 和 kubernetes v1.19.4
> 我在flink v1.11.3发行版下执行如下命令
> kubectl create namespace flink-session-cluster
>
>
> kubectl create serviceaccount flink -n flink-session-cluster
>
>
> kubectl create clusterrolebinding flink-role-binding-flink \
> --clusterrole=edit \ --serviceaccount=flink-session-cluster:flink
>
>
> ./bin/kubernetes-session.sh \ -Dkubernetes.namespace=flink-session-cluster
> \ -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.cluster-id=session001 \
> -Dtaskmanager.memory.process.size=8192m \ -Dkubernetes.taskmanager.cpu=1 \
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dresourcemanager.taskmanager-timeout=360
>
>
> 屏幕打印的结果显示flink web UI启在了 http://192.168.64.2:8081 而不是类似于
> http://192.168.50.135:31753 这样的5位数端口,是哪里有问题?这里的host ip应该是minikube
> ip吗?我本地浏览器访问不了http://192.168.64.2:8081
>
>
>
> 2021-01-02 10:28:04,177 INFO
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
> less than its min value 192.000mb (201326592 bytes), min value will be used
> instead
>
> 2021-01-02 10:28:04,907 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster session001 successfully, JobManager Web Interface:
> http://192.168.64.2:8081
>
>
>
>
> 查看了pods, service, deployment都正常启动好了,显示全绿色的
>
>
> 接下来提交任务
> ./bin/flink run -d \ -e kubernetes-session \
> -Dkubernetes.namespace=flink-session-cluster \
> -Dkubernetes.cluster-id=session001 \ examples/streaming/WindowJoin.jar
>
>
>
> Using windowSize=2000, data rate=3
>
> To customize example, use: WindowJoin [--windowSize
> ] [--rate ]
>
> 2021-01-02 10:21:48,658 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve
> flink cluster session001 successfully, JobManager Web Interface:
> http://10.106.136.236:8081
>
>
>
>
> 这里显示的 http://10.106.136.236:8081 我是能够通过浏览器访问到的,打开显示作业正在运行,而且available
> slots一项显示的是 0,查看JM日志有如下error
>
>
>
>
> Causedby:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Couldnot allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.12-1.11.3.jar:1.11.3]
> ... 47 more
> Causedby: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> ~[?:1.8.0_275]
> ... 27 more
> Causedby: java.util.concurrent.TimeoutException
> ... 25 more
>
>
> 为什么会报这个资源配置不足的错?谢谢解答!
>
>
>
>
>
>
>
>
> 在 2020-12-29 09:53:48,"Yang Wang"  写道:
> >ConfigMap不需要提前创建,那个Warning信息可以忽略,是正常的,主要原因是先创建的deployment,再创建的ConfigMap
> >你可以参考社区的文档[1]把Jm的log打到console看一下
> >
> >我怀疑是你没有创建service account导致的[2]
> >
> >[1].
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#log-files
> >[2].
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#rbac
> >
> >Best,
> >Yang
> >
> >陈帅  于2020年12月28日周一 下午5:54写道:
> >
> >> 今天改用官方最新发布的flink镜像版本1.11.3也启不起来
> >> 这是我的命令
> >> ./bin/kubernetes-session.sh \
> >>   -Dkubernetes.cluster-id=rtdp \
> >>   -Dtaskmanager.memory.process.size=4096m \
> >>   -Dkubernetes.taskmanager.cpu=2 \
> >>   -Dtaskmanager.numberOfTaskSlots=4 \
> >>   -Dresourcemanager.taskmanager-timeout=360 \
> >>   -Dkubernetes.container.image=flink:1.11.3-scala_2.12-java8 \
> >>   -Dkubernetes.namespace=rtdp
> >>
> >>
> >>
> >> Events:
> >>
> >>   Type Reason  AgeFrom   Message
> >>
> >>    --        ---
> >>
> >>   Normal   Scheduled   88sdefault-scheduler
> >> Successfully assigned rtdp/rtdp-6d7794d65d-g6mb5 to
> >> cn-shanghai.192.168.16.130
> >>
> >>   Warning  FailedMount 88skubelet
> >> MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> >> "flink-config-rtdp" not found
> >>
> >>   

Re: pyflink-udaf

2021-01-04 Thread Xingbo Huang
Hi,

你好,事例已经提供了UDF的注册和使用,只剩下数据源的读取和输出没有提供(这有单独的部分来讲)。
关于你的报错,因为你没有提供具体咋使用的,只能猜测你没有按照示例使用DSL的方式,而是使用的字符串的方式,但却没有register函数导致报了这个错

Best,
Xingbo

消息室  于2021年1月4日周一 下午8:10写道:

> 您好:  
> 我们项目组计划使用pyflink的udaf对数据进行聚合处理,拜读社区文章得知,对于批处理方式或窗口式聚合,建议使用Vectorized
> Aggregate Functions。但此case提供的代码不完全,不是完整的case,
> 自己编写测试程序报错:"org.apache.flink.table.api.ValidationException: Undefined
> function: weighted_avg"
>请问能否提供一个完整的pyflink的udaf对数据进行聚合处理示例,多谢!!!
>
>
>
>
>   


pyflink-udaf

2021-01-04 Thread ??????
??   
??pyflink??udaf??Vectorized
 Aggregate Functions??casecase??
??"org.apache.flink.table.api.ValidationException: 
Undefined function: weighted_avg"
   
??pyflink??udaf??




  

Re:Re: pyflink-1.12.0 stream api任务执行失败

2021-01-04 Thread ゛无邪
Hi,您好,非常感谢您的回复!



我刚刚检查了下pyflink的依赖,发现里面用的apache-flink模块确实是flink-1.11的,替换成1.12后问题就解决了,再次感谢您的帮助,谢谢!!














在 2021-01-04 16:36:22,"Xingbo Huang"  写道:
>Hi,
>
>看报错应该是你集群上使用的pyflink的版本是1.11的(那个报错No logging endpoint
>provided.是1.11才有的)。你可以把版本升级到1.12试试
>
>Best,
>Xingbo
>
>゛无邪 <17379152...@163.com> 于2021年1月4日周一 下午4:28写道:
>
>> Hi,您好!
>> 我们参考Flink官网上提供的Python API中的DataStream API用户指南文档编写了一份python脚本,文档地址:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/python/datastream-api-users-guide/operators.html
>> flink运行方式是 on yarn,通过-py参数指定了脚本,能成功提交到yarn上,但是会遇到如下错误
>> Job has been submitted with JobID ee9e3a89eae69f457b81d1ebf4a45264
>> Traceback (most recent call last):
>>   File "official_example_2blk.py", line 44, in 
>> env.execute("tutorial_job")
>>   File
>> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
>> line 623, in execute
>>   File
>> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>> line 1286, in __call__
>>   File
>> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>> line 147, in deco
>>   File
>> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>> line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
>> : java.util.concurrent.ExecutionException:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: ee9e3a89eae69f457b81d1ebf4a45264)
>> 完整的堆栈报错可以参考附件中,求助!!
>>


Re: Tumbling Time Window

2021-01-04 Thread David Anderson
For straightforward tumbling windows, the regular DSL windowing performs
noticeably better than a custom process function because it takes advantage
of an internal API to avoid some serialization overhead.

There's a simple example of a ProcessWindowFunction in [1], and an example
of using a KeyedProcessFunction to do windowing in [2].

Best,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/streaming_analytics.html#window-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example


On Mon, Jan 4, 2021 at 1:05 AM Navneeth Krishnan 
wrote:

> Hello All,
>
> First of all Happy New Year!! Thanks for the excellent community support.
>
> I have a job which requires a 2 seconds tumbling time window per key, For
> each user we wait for 2 seconds to collect enough data and proceed to
>  further processing. My question is should I use the regular DSL windowing
> or write a custom process function which does the windowing. I have heard
> that the DSL window has more overhead versus the custom window function.
>
> What do you guys suggest and can someone provide an example of custom
> window function per key. Also given the window time is very less (2 secs)
> would there be more overhead in firing so many timers for each key?
>
> Thanks!
>
> Regards,
> Navneeth
>


回复: FlinkSQL 下推的值类型与字段类型不对应

2021-01-04 Thread automths
非常感谢你的指导,我将按照你的建议,实现一个TypeVisitor来对下推的类型转化成预期的类型。


祝好!
| |
automths
|
|
autom...@163.com
|
在2021年01月4日 16:49,Sebastian Liu 写道:
Hi automths,

RexNode中的Literal type,在calcite convert to relNode的过程中,以col1 > 10为例,
10从calcite parse出来首先是SqlNumericLiteral, 其中类型会是Decimal(prec: 2, scale: 0).
在创建其对应的RelDataType时,如果其值域在Interger.MIN ~ Interger.Max之间,那就是Interger type。
如果不在就是decimal, 这里没有类似Hive的auto cast功能,而是calcite进行了隐式类型转换。
这里具体隐式转换的规则可以参考:
https://calcite.apache.org/docs/reference.html#implicit-type-conversion

对于Function中,参数的类型,Flink也有一套规则进行推导。

select * from shortRow1 where col1 > CAST(10 AS SMALLINT) and col1 <=
CAST(15 AS SMALLINT) 可以保证
在applyPredicates时看到的expression中,literal是预期的type,但不是特别通用,建议在相关connector中实现
一个TypeVisitor, 把literal转成预期的type。

Just my thoughts

automths  于2021年1月4日周一 上午9:36写道:



谢谢你的回答。

但是我的col1,col2就已经是SMALLINT类型的了,我的问题是where条件中值下推过程中是Integer类型的,我希望值也是SMALLINT的。



祝好!
| |
automths
|
|
autom...@163.com
|
在2020年12月31日 18:17,whirly 写道:
Hi.

查询语句中可以使用 cast 内置函数将值强制转换为指定的类型,如 select CAST(A.`col1` AS SMALLINT) as
col1 from table


参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#type-conversion-functions

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/types.html#data-types




best 2021.


在 2020-12-31 17:13:20,"automths"  写道:
Hi:
我自定义connect并实现了FilterableTableSource接口,可是发现flink planner 下推的Filter中,
Literal类型与字段类型不匹配。
比如:下面的SQL:
select * from shortRow1 where key in (1, 2, 3) and col1 > 10 and col1 <= 15
其中DDL定义时, key、col1、col1都是SMALLINT类型
在下推的Filter中, GreaterThan中的Literal是Integer类型,这样是合理的吗?或者我的查询语句中要做什么处理?


祝好!
| |
automths
|
|
autom...@163.com
|



--

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

2021-01-04 Thread Till Rohrmann
Hi Deep,

Flink has dropped support for specifying the number of TMs via -n since the
introduction of Flip-6. Since then, Flink will automatically start TMs
depending on the required resources. Hence, there is no need to specify the
-n parameter anymore. Instead, you should specify the parallelism with
which you would like to run your job via the -p option.

Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
limit the upper limit of slots a cluster is allowed to allocate [1].

[1] https://issues.apache.org/jira/browse/FLINK-16605

Cheers,
Till

On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh 
wrote:

> Hi Guys,
>
> I’m struggling while initiating the task manager with flink 1.11.0 in AWS
> EMR but with older versions it is not. Let me put the full context here.
>
> *When using Flink 1.9.1 and EMR 5.29.0*
>
> To create a long running session, we used the below command.
>
> *sudo flink-yarn-session -n  -s  -jm 
> -tm  -d*
>
> and followed by below command to run the final job.
>
> *flink run -m yarn-cluster -yid  -yn  -ys
>  -yjm  -ytm  -c  *
>
> and if “n” is 6 then it is used to create 6 task managers to start the job,
> so whatever “n” is configured the result was that number of TM the job is
> being started.
>
> But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
> EMR 6.1.0*) we are unable to achieve the desired values for TM.
>
> Please find the session Ids of new configuration,
>
> *sudo flink-yarn-session -Djobmanager.memory.process.size=
> -Dtaskmanager.memory.process.size= -n  -s  slot/core> -d*
>
> And the final Job command
>
> *flink run -m yarn-cluster -yid  -c   Path>*
>
> I have tried a lot of combinations, but nothing worked out so far. I
> request your help in this regard as the plan to have this configuration in
> *PRODUCTION* soon.
>
> Thanks in advance.
>
>
> Regards,
>
> -Deep
>


Re: FlinkSQL 下推的值类型与字段类型不对应

2021-01-04 Thread Sebastian Liu
Hi automths,

RexNode中的Literal type,在calcite convert to relNode的过程中,以col1 > 10为例,
10从calcite parse出来首先是SqlNumericLiteral, 其中类型会是Decimal(prec: 2, scale: 0).
在创建其对应的RelDataType时,如果其值域在Interger.MIN ~ Interger.Max之间,那就是Interger type。
如果不在就是decimal, 这里没有类似Hive的auto cast功能,而是calcite进行了隐式类型转换。
这里具体隐式转换的规则可以参考:
https://calcite.apache.org/docs/reference.html#implicit-type-conversion

对于Function中,参数的类型,Flink也有一套规则进行推导。

select * from shortRow1 where col1 > CAST(10 AS SMALLINT) and col1 <=
CAST(15 AS SMALLINT) 可以保证
在applyPredicates时看到的expression中,literal是预期的type,但不是特别通用,建议在相关connector中实现
一个TypeVisitor, 把literal转成预期的type。

Just my thoughts

automths  于2021年1月4日周一 上午9:36写道:

>
>
> 谢谢你的回答。
>
> 但是我的col1,col2就已经是SMALLINT类型的了,我的问题是where条件中值下推过程中是Integer类型的,我希望值也是SMALLINT的。
>
>
>
> 祝好!
> | |
> automths
> |
> |
> autom...@163.com
> |
> 在2020年12月31日 18:17,whirly 写道:
> Hi.
>
> 查询语句中可以使用 cast 内置函数将值强制转换为指定的类型,如 select CAST(A.`col1` AS SMALLINT) as
> col1 from table
>
>
> 参考:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#type-conversion-functions
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/types.html#data-types
>
>
>
>
> best 2021.
>
>
> 在 2020-12-31 17:13:20,"automths"  写道:
> Hi:
> 我自定义connect并实现了FilterableTableSource接口,可是发现flink planner 下推的Filter中,
> Literal类型与字段类型不匹配。
> 比如:下面的SQL:
> select * from shortRow1 where key in (1, 2, 3) and col1 > 10 and col1 <= 15
> 其中DDL定义时, key、col1、col1都是SMALLINT类型
> 在下推的Filter中, GreaterThan中的Literal是Integer类型,这样是合理的吗?或者我的查询语句中要做什么处理?
>
>
> 祝好!
> | |
> automths
> |
> |
> autom...@163.com
> |
>
>

-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Flink on k8s 1.11.3版本,使用 hdfs 的任务, taskmanager 无法启动的问题

2021-01-04 Thread 龙逸尘
各位下午好,目前我正在使用 Flink on k8s application-mode 构建构建一个消费 kafka 写入 hive 的
demo,使用 hdfs 作为 statebackend,遇到了一些问题,希望能得到帮助。这里我描述一下问题与 debug 的过程。

Dockerfile 如下

FROM flink:1.11.3-scala_2.11
RUN mkdir -p $FLINK_HOME/usrlib
RUN mkdir -p /opt/hadoop/conf
COPY flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
$FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
COPY flink-on-k8s-1.0-SNAPSHOT.jar
$FLINK_HOME/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
ENV HADOOP_CONF_DIR /opt/hadoop/conf
ENV YARN_CONF_DIR /opt/hadoop/conf
COPY yarn-site.xml /opt/hadoop/conf/yarn-site.xml
COPY hdfs-site.xml /opt/hadoop/conf/hdfs-site.xml
COPY core-site.xml /opt/hadoop/conf/core-site.xml

启动命令如下

flink-1.11.3/bin/flink run-application -p 1 -t kubernetes-application
-Dkubernetes.cluster-id=my-first-application-cluster-demo7-4
-Dkubernetes.jobmanager.service-account=flink
-Dtaskmanager.memory.process.size=1024m   -Dkubernetes.taskmanager.cpu=1
-Dtaskmanager.numberOfTaskSlots=1
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
%jvmopts% %logging% %class% %args%"
-Dkubernetes.container.image=flink:demo7-4
-Dkubernetes.rest-service.exposed.type=NodePort
local:///opt/flink/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar

flink-on-k8s-1.0-SNAPSHOT.jar 这个 jar 包仅仅是消费 kafka,使用 hdfs 作为 statebackend
记录状态。

一开始尝试只把 yarn-site.xml 等三个文件放在 usrlib 目录下,JobManager 无法启动,报错是
UnknownHost。参考邮件列表中的信息,设置HADOOP_CONF_DIR之后,JobManager 成功启动且没有报错日志,但是
TaskManager 一直处于 ContainerCreating 状态,7-8 分钟后 deployment 自动退出。使用 describe
pod 获取 tm 报错信息如下:

Events:
  Type Reason   Age   From
 Message
   --     
 ---
  Normal   Scheduled default-scheduler
Successfully assigned
default/my-first-application-cluster-demo7-4-taskmanager-1-1 to k8s-node0002
  Warning  FailedMount  37s (x10 over 4m46s)  kubelet,
k8s-ci-dcn-bigdata-node0002  MountVolume.SetUp failed for volume
"hadoop-config-volume" : configmap
"hadoop-config-my-first-application-cluster-demo7-4" not found
  Warning  FailedMount  29s (x2 over 2m44s)   kubelet, k8s-node0002  Unable
to attach or mount volumes: unmounted volumes=[hadoop-config-volume],
unattached volumes=[hadoop-config-volume flink-config-volume
default-token-fhkhf]: timed out waiting for the condition

请问我是否配置有误,还是需要别的配置来启用 hdfs。
期待您的回复~

---
Best Regards!

Yichen


Re: pyflink-1.12.0 stream api任务执行失败

2021-01-04 Thread Xingbo Huang
Hi,

看报错应该是你集群上使用的pyflink的版本是1.11的(那个报错No logging endpoint
provided.是1.11才有的)。你可以把版本升级到1.12试试

Best,
Xingbo

゛无邪 <17379152...@163.com> 于2021年1月4日周一 下午4:28写道:

> Hi,您好!
> 我们参考Flink官网上提供的Python API中的DataStream API用户指南文档编写了一份python脚本,文档地址:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/python/datastream-api-users-guide/operators.html
> flink运行方式是 on yarn,通过-py参数指定了脚本,能成功提交到yarn上,但是会遇到如下错误
> Job has been submitted with JobID ee9e3a89eae69f457b81d1ebf4a45264
> Traceback (most recent call last):
>   File "official_example_2blk.py", line 44, in 
> env.execute("tutorial_job")
>   File
> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
> line 623, in execute
>   File
> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: ee9e3a89eae69f457b81d1ebf4a45264)
> 完整的堆栈报错可以参考附件中,求助!!
>


pyflink-1.12.0 stream api任务执行失败

2021-01-04 Thread ゛无邪
Hi,您好!我们参考Flink官网上提供的Python API中的DataStream API用户指南文档编写了一份python脚本,文档地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/python/datastream-api-users-guide/operators.htmlflink运行方式是 on yarn,通过-py参数指定了脚本,能成功提交到yarn上,但是会遇到如下错误Job has been submitted with JobID ee9e3a89eae69f457b81d1ebf4a45264Traceback (most recent call last):  File "official_example_2blk.py", line 44, in     env.execute("tutorial_job")  File "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", line 623, in execute  File "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__  File "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco  File "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_valuepy4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ee9e3a89eae69f457b81d1ebf4a45264)完整的堆栈报错可以参考附件中,还请帮忙看下具体原因!

flink.log
Description: Binary data


Flink SQL查询ORC表结果全部为NULL

2021-01-04 Thread Jacob
Flink SQL> select * from table1 where dt='1609739880002';



table1是张orc表,有分区(dt是分区),在flink sql客户端查询表的结果全部为NULL,但select
count是可以查出数据条数。找了好几天的原因,实在不知道是什么原因了,求教!!!

Flink SQL> select * from table1 where dt='1609739880002';


 





Flink SQL> select count(*) from `table1` where  dt='1609739880002';


 







-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/