Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-17 Thread Yangze Guo
I second Thomas that we can support both Java 8 and 11.

Best,
Yangze Guo

On Wed, Mar 18, 2020 at 12:12 PM Thomas Weise  wrote:
>
> -->
>
> On Mon, Mar 16, 2020 at 1:58 AM Andrey Zagrebin  wrote:
>>
>> Thanks for the further feedback Thomas and Yangze.
>>
>> > A generic, dynamic configuration mechanism based on environment variables
>> is essential and it is already supported via envsubst and an environment
>> variable that can supply a configuration fragment
>>
>> True, we already have this. As I understand this was introduced for
>> flexibility to template a custom flink-conf.yaml with env vars, put it into
>> the FLINK_PROPERTIES and merge it with the default one.
>> Could we achieve the same with the dynamic properties (-Drpc.port=1234),
>> passed as image args to run it, instead of FLINK_PROPERTIES?
>> They could be also parametrised with env vars. This would require
>> jobmanager.sh to properly propagate them to
>> the StandaloneSessionClusterEntrypoint though:
>> https://github.com/docker-flink/docker-flink/pull/82#issuecomment-525285552
>> cc @Till
>> This would provide a unified configuration approach.
>>
>
> How would that look like for the various use cases? The k8s operator would 
> need to generate the -Dabc .. -Dxyz entry point command instead of setting 
> the FLINK_PROPERTIES environment variable? Potentially that introduces 
> additional complexity for little gain. Do most deployment platforms that 
> support Docker containers handle the command line route well? Backward 
> compatibility may also be a concern.
>
>>
>> > On the flip side, attempting to support a fixed subset of configuration
>> options is brittle and will probably lead to compatibility issues down the
>> road
>>
>> I agree with it. The idea was to have just some shortcut scripted functions
>> to set options in flink-conf.yaml for a custom Dockerfile or entry point
>> script.
>> TASK_MANAGER_NUMBER_OF_TASK_SLOTS could be set as a dynamic property of
>> started JM.
>> I am not sure how many users depend on it. Maybe we could remove it.
>> It also looks we already have somewhat unclean state in
>> the docker-entrypoint.sh where some ports are set the hardcoded values
>> and then FLINK_PROPERTIES are applied potentially duplicating options in
>> the result flink-conf.yaml.
>
>
> That is indeed possible and duplicate entries from FLINK_PROPERTIES prevail. 
> Unfortunately, the special cases you mention were already established and the 
> generic mechanism was added later for the k8s operators.
>
>>
>>
>> I can see some potential usage of env vars as standard entry point args but
>> for purposes related to something which cannot be achieved by passing entry
>> point args, like changing flink-conf.yaml options. Nothing comes into my
>> mind at the moment. It could be some setting specific to the running mode
>> of the entry point. The mode itself can stay the first arg of the entry
>> point.
>>
>> > I would second that it is desirable to support Java 11
>>
>> > Regarding supporting JAVA 11:
>> > - Not sure if it is necessary to ship JAVA. Maybe we could just change
>> > the base image from openjdk:8-jre to openjdk:11-jre in template docker
>> > file[1]. Correct me if I understand incorrectly. Also, I agree to move
>> > this out of the scope of this FLIP if it indeed takes much extra
>> > effort.
>>
>> This is what I meant by bumping up the Java version in the docker hub Flink
>> image:
>> FROM openjdk:8-jre -> FROM openjdk:11-jre
>> This can be polled dependently in user mailing list.
>
>
> That sounds reasonable as long as we can still support both Java versions 
> (i.e. provide separate images for 8 and 11).
>
>>
>>
>> > and in general use a base image that allows the (straightforward) use of
>> more recent versions of other software (Python etc.)
>>
>> This can be polled whether to always include some version of python into
>> the docker hub image.
>> A potential problem here is once it is there, it is some hassle to
>> remove/change it in a custom extended Dockerfile.
>>
>> It would be also nice to avoid maintaining images for various combinations
>> of installed Java/Scala/Python in docker hub.
>>
>> > Regarding building from local dist:
>> > - Yes, I bring this up mostly for development purpose. Since k8s is
>> > popular, I believe more and more developers would like to test their
>> > work on k8s cluster. I'm not sure should all developers write a custom
>> > docker file themselves in this scenario. Thus, I still prefer to
>> > provide a script for devs.
>> > - I agree to keep the scope of this FLIP mostly for those normal
>> > users. But as far as I can see, supporting building from local dist
>> > would not take much extra effort.
>> > - The maven docker plugin sounds good. I'll take a look at it.
>>
>> I would see any scripts introduced in this FLIP also as potential building
>> blocks for a custom dev Dockerfile.
>> Maybe, this will be all what we need for dev images or we write a dev
>> Dockerfile, highly 

Re: state schema evolution for case classes

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi Apoorv,

Flink currently does not natively support schema evolution for state types
using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your
specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes
offline as a batch job

For your question on how to implement a schema-evolution supporting
serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that
would be a starting point to implement something similar for your case
classes.

As you will quickly realize, it's not simple, so I would strongly suggest
trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Thanks a lot , Also can you share one example where these has been
> implemented? I have gone through docs does not happen to work still
>
> On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Apoorv,
>>
>> You can achieve this by implementing custom serializers for your state.
>> Please refer to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <
>> apoorv.upadh...@razorpay.com> wrote:
>>
>>> Hi Roman,
>>>
>>> I have successfully migrated to flink 1.8.2 with the savepoint created
>>> by flink 1.6.2.
>>> Now I have to modify few case classes due to new requirement I have
>>> created a savepoint and when I run the app with modified class from the
>>> savepoint it throws error "state not compatible"
>>> Previously there were no serializer used.
>>> I now wish to support state schema Hence need suggestion how can i
>>> achieve that ?
>>>
>>> Regards
>>>
>>> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi ApoorvK,

 I understand that you have a savepoint created by Flink 1.6.2 and you
 want to use it with Flink 1.8.2. The classes themselves weren't modified.
 Is that correct?
 Which serializer did you use?

 Regards,
 Roman


 On Tue, Feb 25, 2020 at 8:38 AM ApoorvK 
 wrote:

> Hi Team,
>
> Earlier we have developed on flink 1.6.2 , So there are lots of case
> classes
> which have Map,Nested case class within them for example below :
>
> case class MyCaseClass(var a: Boolean,
>  var b: Boolean,
>  var c: Boolean,
>  var d: NestedCaseClass,
>  var e:Int){
> def this(){this(false,false,new NestedCaseClass,0)}
> }
>
>
> Now we have migrated to flink 1.8.2 , I need help to figure out how
> can I
> achieve state schema evolution for such classes.
>
> 1. Is creating avro for these classes now, and implement avro
> serialisation
> will that work ?
> 2. Or if I register kyroserialiser with protobuf serialiser at env?
>
> Please suggest what can be done here, or redirect for the avros
> serialisation example.
>
> Thanks
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>



Re: state schema evolution for case classes

2020-03-17 Thread Apoorv Upadhyay
Thanks a lot , Also can you share one example where these has been
implemented? I have gone through docs does not happen to work still

On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Apoorv,
>
> You can achieve this by implementing custom serializers for your state.
> Please refer to
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html
>
> Regards,
> Roman
>
>
> On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <
> apoorv.upadh...@razorpay.com> wrote:
>
>> Hi Roman,
>>
>> I have successfully migrated to flink 1.8.2 with the savepoint created by
>> flink 1.6.2.
>> Now I have to modify few case classes due to new requirement I have
>> created a savepoint and when I run the app with modified class from the
>> savepoint it throws error "state not compatible"
>> Previously there were no serializer used.
>> I now wish to support state schema Hence need suggestion how can i
>> achieve that ?
>>
>> Regards
>>
>> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi ApoorvK,
>>>
>>> I understand that you have a savepoint created by Flink 1.6.2 and you
>>> want to use it with Flink 1.8.2. The classes themselves weren't modified.
>>> Is that correct?
>>> Which serializer did you use?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Tue, Feb 25, 2020 at 8:38 AM ApoorvK 
>>> wrote:
>>>
 Hi Team,

 Earlier we have developed on flink 1.6.2 , So there are lots of case
 classes
 which have Map,Nested case class within them for example below :

 case class MyCaseClass(var a: Boolean,
  var b: Boolean,
  var c: Boolean,
  var d: NestedCaseClass,
  var e:Int){
 def this(){this(false,false,new NestedCaseClass,0)}
 }


 Now we have migrated to flink 1.8.2 , I need help to figure out how can
 I
 achieve state schema evolution for such classes.

 1. Is creating avro for these classes now, and implement avro
 serialisation
 will that work ?
 2. Or if I register kyroserialiser with protobuf serialiser at env?

 Please suggest what can be done here, or redirect for the avros
 serialisation example.

 Thanks




 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


Flink Weekly | 每周社区动态更新 - 2020/03/18

2020-03-17 Thread LakeShen
大家好,本文为 Flink Weekly 的第九期,由沈磊(LakeShen)整理,主要内容包括:近期社区开发进展,邮件问题答疑以及 Flink
中文社区相关技术博客的分享。

社区开发进展
[Table API & SQL] Jingsong Li 发起 FLIP-115 的讨论,主要在 Flink Table 支持 FileSystem
Connector,FLIP-115 主要内容包括:
1. 在 Flink Table 中支持 FileSystem Table Factory,同时支持csv/parquet/orc/json/avro
格式。

2. 支持在流应用或者 Flink On Hive 中数据输出。

更多信息请参考:

[1
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-115-Filesystem-connector-in-Table-td33625.html

[RunTime / Configuration]  Andrey 发起 FLIP-116 统一的 JobManager 的内存配置的讨论,在
FLIP-49中,我们针对 TaskManager进行统一的内存管理和配置,在 Flink 1.10 版本中 release 该功能。为了让
JobManager 的内存模型和配置保持对齐,同时针对用户代码 native non-direct memory 的使用,在FLIP-116
中都进行了详细说明。

更多信息请参考:

[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors

[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP+116%3A+Unified+Memory+Configuration+for+Job+Managers

[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview
[Connectors / HBase] Flavio 发起了 FLIP-117 HBase Catalog的讨论,该 FLIP 主要讨论
HBaseCatalog 的实现。

更多信息参考:

[6]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-117%3A+HBase+catalog

[7]https://issues.apache.org/jira/browse/FLINK-16575
Yu Li 发起了 Releasing Flink 1.10.1 的相关讨论。

[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html

为了让大家了解到 Flink  1.11 的相关特性,Zhijiang 发起了 Flink 1.11
特性的讨论,大家有什么想法或者期待可以在下面邮件中进行相关回复。

[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Features-of-Apache-Flink-1-11-td38724.html#a38793





用户问题
shravan 社区提问:当 k8s 集群突然 down 掉时,如何优雅的通过 stop with savepoint 停止作业,Vijay
进行了解答。

[10]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stop-job-with-savepoint-during-graceful-shutdown-on-a-k8s-cluster-td33626.html
Alexander 使用 Flink 1.10 ,对于 Mesos 容器内存配置方面遇到一些问题,Yangze Guo进行了详细解答。[11]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-10-container-memory-configuration-with-Mesos-td33594.htmlwanglei2
询问了在  Flink SQL 任务中,如何设置状态后端,以及在 SQL Client 中,维表 Join 任务没有运行问题。Jingsong Li
和 Zhenghua Gao 分别进行了详细的回答。

[12]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-set-stateBackEnd-in-flink-sql-program-td33590.html

[13]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/dimention-table-join-not-work-under-sql-client-fink-1-10-0-td33616.html
Yuval 提问了关于 Flink 如何从增量的 Checkpoint 状态回复的一些问题,Andrey 进行了详细的解答。

[14]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Restoring-state-from-an-incremental-RocksDB-checkpoint-td33630.html

Eyal 遇到了一些 Flink On Yarn 方面的日志打印配置问题,社区同学进行了解答。

[15]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Setting-app-Flink-logger-td33537.html
Flavio 在社区提问了关于 Alink 和 Flink ML 的问题,感兴趣的可以看一下。

[16]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Alink-and-Flink-ML-td2.html
LakeShen 询问了关于从 Checkppoint 状态文件恢复,是否能够改变算子的并发的问题。对于 Checkpoint
的状态文件,当任务从其开始恢复时,可以调整算法的并发度,只要不要修改算子最大的并发度就行。

[17]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cancel-the-flink-task-and-restore-from-checkpoint-can-I-change-the-flink-operator-s-parallelism-td33613.html
karl 在社区提问了关于 Flink Session 窗口的状态 TTL 问题。

[18]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Session-Windows-State-TTL-td33349.html



活动/博客文章/其他

SQL 开发任务超 50%,滴滴实时计算的演进和优化

[19]
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247486354=1=02a55f72b950cac757fafd79e10b39a3=fd3b85d0ca4c0cc68c558aff88c4f207dbd55239c931b339f5cc0de6681923a20542f035609b=1=1=_sharetime=1584422047859_shareid=3e9e3225194926b8fbe2f7e547e483de=1#wechat_redirect

Flink 如何支持特征工程、在线学习、在线预测等 AI 场景?

[20]
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247486319=1=d095fd58dd0a447f41b616279de1983e=fd3b852dca4c0c3be3f9a7cc0b750f57ab17bda7079d2da52d54c3f0ea9e84e42a848ed90167=1=1=_sharetime=1584422074948_shareid=3e9e3225194926b8fbe2f7e547e483de=1#wechat_redirect

一行配置作业性能提升53%!Flink SQL 性能之旅

[21]
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247486278=1=9b90e3dff21d7e973b9e40777381d032=fd3b8504ca4c0c121efd09bf3823b4f856529ab2911f09035ddffbd08fcf8d6369b28b46642e=1=1=_sharetime=1584422086156_shareid=3e9e3225194926b8fbe2f7e547e483de=1#wechat_redirect

有赞实时任务优化:Flink Checkpoint 异常解析与应用实践

[22]
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247486257=1=22484a15929de46cf5e8a68aadf50875=fd3b8573ca4c0c650c34deb00c12e5130af471739364a95a81b9091690b09eb8094b49bb79bf=1=1=_sharetime=1584422097105_shareid=3e9e3225194926b8fbe2f7e547e483de=1#wechat_redirect


Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

2020-03-17 Thread Bashar Abdul-Jawad
StateAssignmentOperation.checkStateMappingCompleteness

doesn't
check for UserDefinedOperatorIDs (specified using setUidHash), causing the
exception:

 java.lang.IllegalStateException: There is no operator for the state {}

to be thrown when a savepoint can't be mapped to an ExecutionJobVertex,
even when the operator hash is explicitly specified.

I believe this logic should be extended to also include
UserDefinedOperatorIDs as so:

for (ExecutionJobVertex executionJobVertex : tasks) {
  allOperatorIDs.addAll(executionJobVertex.getOperatorIDs());
  allOperatorIDs.addAll(executionJobVertex.getUserDefinedOperatorIDs());
}

I filed https://issues.apache.org/jira/browse/FLINK-16638, please let me
know if I am missing something here.


Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-17 Thread Thomas Weise
-->

On Mon, Mar 16, 2020 at 1:58 AM Andrey Zagrebin 
wrote:

> Thanks for the further feedback Thomas and Yangze.
>
> > A generic, dynamic configuration mechanism based on environment variables
> is essential and it is already supported via envsubst and an environment
> variable that can supply a configuration fragment
>
> True, we already have this. As I understand this was introduced for
> flexibility to template a custom flink-conf.yaml with env vars, put it into
> the FLINK_PROPERTIES and merge it with the default one.
> Could we achieve the same with the dynamic properties (-Drpc.port=1234),
> passed as image args to run it, instead of FLINK_PROPERTIES?
> They could be also parametrised with env vars. This would require
> jobmanager.sh to properly propagate them to
> the StandaloneSessionClusterEntrypoint though:
> https://github.com/docker-flink/docker-flink/pull/82#issuecomment-525285552
> cc @Till
> This would provide a unified configuration approach.
>
>
How would that look like for the various use cases? The k8s operator would
need to generate the -Dabc .. -Dxyz entry point command instead of setting
the FLINK_PROPERTIES environment variable? Potentially that introduces
additional complexity for little gain. Do most deployment platforms that
support Docker containers handle the command line route well? Backward
compatibility may also be a concern.


> > On the flip side, attempting to support a fixed subset of configuration
> options is brittle and will probably lead to compatibility issues down the
> road
>
> I agree with it. The idea was to have just some shortcut scripted functions
> to set options in flink-conf.yaml for a custom Dockerfile or entry point
> script.
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS could be set as a dynamic property of
> started JM.
> I am not sure how many users depend on it. Maybe we could remove it.
> It also looks we already have somewhat unclean state in
> the docker-entrypoint.sh where some ports are set the hardcoded values
> and then FLINK_PROPERTIES are applied potentially duplicating options in
> the result flink-conf.yaml.
>

That is indeed possible and duplicate entries from FLINK_PROPERTIES
prevail. Unfortunately, the special cases you mention were already
established and the generic mechanism was added later for the k8s operators.


>
> I can see some potential usage of env vars as standard entry point args but
> for purposes related to something which cannot be achieved by passing entry
> point args, like changing flink-conf.yaml options. Nothing comes into my
> mind at the moment. It could be some setting specific to the running mode
> of the entry point. The mode itself can stay the first arg of the entry
> point.
>
> > I would second that it is desirable to support Java 11
>
> > Regarding supporting JAVA 11:
> > - Not sure if it is necessary to ship JAVA. Maybe we could just change
> > the base image from openjdk:8-jre to openjdk:11-jre in template docker
> > file[1]. Correct me if I understand incorrectly. Also, I agree to move
> > this out of the scope of this FLIP if it indeed takes much extra
> > effort.
>
> This is what I meant by bumping up the Java version in the docker hub Flink
> image:
> FROM openjdk:8-jre -> FROM openjdk:11-jre
> This can be polled dependently in user mailing list.
>

That sounds reasonable as long as we can still support both Java versions
(i.e. provide separate images for 8 and 11).


>
> > and in general use a base image that allows the (straightforward) use of
> more recent versions of other software (Python etc.)
>
> This can be polled whether to always include some version of python into
> the docker hub image.
> A potential problem here is once it is there, it is some hassle to
> remove/change it in a custom extended Dockerfile.
>
> It would be also nice to avoid maintaining images for various combinations
> of installed Java/Scala/Python in docker hub.
>
> > Regarding building from local dist:
> > - Yes, I bring this up mostly for development purpose. Since k8s is
> > popular, I believe more and more developers would like to test their
> > work on k8s cluster. I'm not sure should all developers write a custom
> > docker file themselves in this scenario. Thus, I still prefer to
> > provide a script for devs.
> > - I agree to keep the scope of this FLIP mostly for those normal
> > users. But as far as I can see, supporting building from local dist
> > would not take much extra effort.
> > - The maven docker plugin sounds good. I'll take a look at it.
>
> I would see any scripts introduced in this FLIP also as potential building
> blocks for a custom dev Dockerfile.
> Maybe, this will be all what we need for dev images or we write a dev
> Dockerfile, highly parametrised for building a dev image.
> If scripts stay in apache/flink-docker, it is also somewhat inconvenient to
> use them in the main Flink repo but possible.
> If we move them to apache/flink then we will have to e.g. include them into
> the release to make them 

Re: Streaming File Sink的使用问题

2020-03-17 Thread Yun Gao
从报错来看,GenericRecord可能不能被序列化;感觉目前可以先用一个自定义的数据类型来传输


--
From:58683632 <58683...@qq.com>
Send Time:2020 Mar. 17 (Tue.) 13:33
To:user-zh 
Subject:Streaming File Sink的使用问题

Streaming File Sink使用parquet avro格式进行bulk write,代码如下:final 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new FsStateBackend(new Path("file:///g:/checkpoint")));
Schema schema = new 
Schema.Parser().parse("{\"namespace\":\"example.avro\",\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
final DataStreamSource

Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-17 Thread tison
Sorry I mixed up the log, it belongs to previous failure.

Could you trying to reproduce the problem with DEBUG level log?

>From the log we knew that JM & RM had been elected as leader but the
listener didn't work. However, we didn't know it is because the leader
didn't publish the leader info or the listener didn't get notified.

Best,
tison.


tison  于2020年3月18日周三 上午10:40写道:

> Hi Abhinav,
>
> The problem is
>
> Curator: Background operation retry gave up
>
> So it is the ZK ensemble too unstable to get recovery in time so that
> Curator stopped retrying and threw a fatal error.
>
> Best,
> tison.
>
>
> Xintong Song  于2020年3月18日周三 上午10:22写道:
>
>> I'm not familiar with ZK either.
>>
>> I've copied Yang Wang, who might be able to provide some suggestions.
>>
>> Alternatively, you can try to post your question to the Apache ZooKeeper
>> community, see if they have any clue.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Mar 18, 2020 at 8:12 AM Bajaj, Abhinav 
>> wrote:
>>
>>> Hi Xintong,
>>>
>>>
>>>
>>> I did check the Zk logs and didn’t notice anything interesting.
>>>
>>> I have limited expertise in zookeeper.
>>>
>>> Can you share an example of what I should be looking for in Zk?
>>>
>>>
>>>
>>> I was able to reproduce this issue again with Flink 1.7 by killing the
>>> zookeeper leader that disrupted the quorum.
>>>
>>> The sequence of logs in this case look quite similar to one we have been
>>> discussing.
>>>
>>>
>>>
>>> If the code hasn’t changed in this area till 1.10 then maybe the latest
>>> version also has the potential issue.
>>>
>>>
>>>
>>> Its not straightforward to bump up the Flink version in the
>>> infrastructure available to me.
>>>
>>> But I will think if there is a way around it.
>>>
>>>
>>>
>>> ~ Abhinav Bajaj
>>>
>>>
>>>
>>> *From: *Xintong Song 
>>> *Date: *Monday, March 16, 2020 at 8:00 PM
>>> *To: *"Bajaj, Abhinav" 
>>> *Cc: *"user@flink.apache.org" 
>>> *Subject: *Re: JobMaster does not register with ResourceManager in high
>>> availability setup
>>>
>>>
>>>
>>> Hi Abhinav,
>>>
>>>
>>>
>>> I think you are right. The log confirms that JobMaster has not tried to
>>> connect ResourceManager. Most likely the JobMaster requested for RM address
>>> but has never received it.
>>>
>>>
>>>
>>> I would suggest you to check the ZK logs, see if the request form JM for
>>> RM address has been received and properly responded.
>>>
>>>
>>>
>>> If you can easily reproduce this problem, and you are able to build
>>> Flink from source, you can also try to insert more logs in Flink to further
>>> confirm whether the RM address is received. I don't think that's necessary
>>> though, since those codes have not been changed since Flink 1.7 till the
>>> latest 1.10, and I'm not aware of any reported issue that the JM may not
>>> try to connect RM once the address is received.
>>>
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 17, 2020 at 7:45 AM Bajaj, Abhinav 
>>> wrote:
>>>
>>> Hi Xintong,
>>>
>>>
>>>
>>> Apologies for delayed response. I was away for a week.
>>>
>>> I am attaching more jobmanager logs.
>>>
>>>
>>>
>>> To your point on the taskmanagers, the job is deployed with 20
>>> parallelism but it has 22 TMs to have 2 of them as spare to assist in quick
>>> failover.
>>>
>>> I did check the logs and all 22 of task executors from those TMs get
>>> registered by the time - 2020-02-27 06:35:47.050.
>>>
>>>
>>>
>>> You would notice that even after this time, the job fails with the error
>>> “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Could not allocate all requires slots within timeout of 30 ms. Slots
>>> required: 201, slots allocated: 0” at 2020-02-27 06:40:36.778.
>>>
>>>
>>>
>>> Thanks a ton for you help.
>>>
>>>
>>>
>>> ~ Abhinav Bajaj
>>>
>>>
>>>
>>> *From: *Xintong Song 
>>> *Date: *Thursday, March 5, 2020 at 6:30 PM
>>> *To: *"Bajaj, Abhinav" 
>>> *Cc: *"user@flink.apache.org" 
>>> *Subject: *Re: JobMaster does not register with ResourceManager in high
>>> availability setup
>>>
>>>
>>>
>>> Hi Abhinav,
>>>
>>>
>>>
>>> Thanks for the log. However, the attached log seems to be incomplete.
>>> The NoResourceAvailableException cannot be found in this log.
>>>
>>>
>>>
>>> Regarding connecting to ResourceManager, the log suggests that:
>>>
>>>- ZK was back to life and connected at 06:29:56.
>>>2020-02-27 06:29:56.539 [main-EventThread] level=INFO
>>> o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State
>>>change: CONNECTED
>>>- RM registered to ZK and was granted leadership at 06:30:01.
>>>2020-02-27 06:30:01.677 [flink-akka.actor.default-dispatcher-5]
>>>level=INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  
>>> -
>>>ResourceManager akka.tcp://flink@JOBMANAGER:6126/user/resourcemanager
>>>was granted leadership with fencing token 
>>> a2c453481ea4e0c7722cab1e4dd741db
>>>- JM requests RM leader address from ZK at 06:30:06.
>>>2020-02-27 

Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Jeff Zhang
I agree, this is really confusing for users. Do you mind to create a ticket
for that ?

Craig Foster  于2020年3月18日周三 上午8:36写道:

> If I specify these options, it seems to work...but I thought I could
> have this dynamically determined when submitting jobs just using the
> "yarn" option:
>
> /usr/lib/flink/bin/start-scala-shell.sh yarn -s 4 -jm 1024m -tm 4096m
>
> I guess what isn't clear here to me is that if you use `yarn` alone
> there needs to be an existing yarn cluster already started.
>
>
> On Tue, Mar 17, 2020 at 4:22 PM Craig Foster 
> wrote:
> >
> > Yeah, I was wondering about that. I'm using
> > `/usr/lib/flink/bin/start-scala-shell.sh yarn`-- previously I'd use
> > `/usr/lib/flink/bin/start-scala-shell.sh yarn -n ${NUM}`
> >  but that deprecated option was removed.
> >
> >
> > On Tue, Mar 17, 2020 at 4:11 PM Jeff Zhang  wrote:
> > >
> > > It looks like you are running under standalone mode, what is your
> command to start scala shell. ?
> > >
> > > Craig Foster  于2020年3月18日周三 上午5:23写道:
> > >>
> > >> Hi:
> > >> When I upgraded from Flink 1.9.1 to Flink 1.10.0 I can't execute
> > >> programs at the Scala shell.
> > >>
> > >> It gives me an error that the REST address must be set. This looks
> > >> like it comes from HA but I don't have HA configured for Flink and it
> > >> was very hard to find this documented other than in the PR/JIRA in the
> > >> history so don't have much context. Can someone point me to how to
> > >> configure this properly? For reference, I put the example stacktrace
> > >> below.
> > >>
> > >> scala> val text = benv.fromElements("To be, or not to be,--that is the
> > >> question:--");
> > >> text: org.apache.flink.api.scala.DataSet[String] =
> > >> org.apache.flink.api.scala.DataSet@2396408a
> > >>
> > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")}.map {
> > >> (_, 1) }.groupBy(0).sum(1);
> > >> counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] =
> > >> org.apache.flink.api.scala.AggregateDataSet@38bce2ed
> > >>
> > >> scala> counts.print()
> > >> 20/03/17 21:15:34 INFO java.ExecutionEnvironment: The job has 0
> > >> registered types and 0 default Kryo serializers
> > >> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> > >> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
> > >> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> > >> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
> > >> java.lang.RuntimeException: Couldn't retrieve standalone cluster
> > >>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
> > >>   at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:944)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
> > >>   at
> org.apache.flink.api.java.ScalaShellEnvironment.execute(ScalaShellEnvironment.java:81)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
> > >>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> > >>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> > >>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
> > >>   ... 30 elided
> > >> Caused by: java.lang.NullPointerException: rest.address must be set
> > >>   at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> > >>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
> > >>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
> > >>   at
> org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
> > >>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
> > >>   ... 38 more
> > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re: Flink SQL, How can i set parallelism in clause of group by ?

2020-03-17 Thread Benchao Li
Hi forideal,

Currently, there is no way to change an operator's parallelism for SQL. All
operators use global parallelism as their parallelism, except for some
'global operator' which can only has parallelism of 1.

BTW, does changing the parallelism of all operators meets your need?

forideal  于2020年3月18日周三 上午12:50写道:

> Hello everyone
>
>  I'm a Flink SQL user. Now i have a question.How can i set parallelism
> in clause of group by.
>  For example
>  SELECT
> T.user_id,
> D.user_name
>  (SELECT
>  user_id,
>  MIN(processtime)
>  from my_table
> group by user_id,TUMBLE(processtime, INTERVAL '15' SECOND)) AS T
>  LEFT JOIN my_dim_table FOR SYSTEM_TIME AS OF T.proc_time AS D ON
> T.user_id = D.user_id
>
>  If the partition of the topic is 3, then,the parallelism of the job
> is 3.
>  In this example,there are three operator
>  0ne is Source operator, parallelism is 3
>  Two is GroupWindowAggregate operator,parallelism is 3
>  Three is LookupJoin operator,parallelism is 3
>
>   I want to change the parallelism of GroupWindowAggregate,but i
> can't.
>
> Best wishes
> forideal
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Craig Foster
Yeah, I was wondering about that. I'm using
`/usr/lib/flink/bin/start-scala-shell.sh yarn`-- previously I'd use
`/usr/lib/flink/bin/start-scala-shell.sh yarn -n ${NUM}`
 but that deprecated option was removed.


On Tue, Mar 17, 2020 at 4:11 PM Jeff Zhang  wrote:
>
> It looks like you are running under standalone mode, what is your command to 
> start scala shell. ?
>
> Craig Foster  于2020年3月18日周三 上午5:23写道:
>>
>> Hi:
>> When I upgraded from Flink 1.9.1 to Flink 1.10.0 I can't execute
>> programs at the Scala shell.
>>
>> It gives me an error that the REST address must be set. This looks
>> like it comes from HA but I don't have HA configured for Flink and it
>> was very hard to find this documented other than in the PR/JIRA in the
>> history so don't have much context. Can someone point me to how to
>> configure this properly? For reference, I put the example stacktrace
>> below.
>>
>> scala> val text = benv.fromElements("To be, or not to be,--that is the
>> question:--");
>> text: org.apache.flink.api.scala.DataSet[String] =
>> org.apache.flink.api.scala.DataSet@2396408a
>>
>> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")}.map {
>> (_, 1) }.groupBy(0).sum(1);
>> counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] =
>> org.apache.flink.api.scala.AggregateDataSet@38bce2ed
>>
>> scala> counts.print()
>> 20/03/17 21:15:34 INFO java.ExecutionEnvironment: The job has 0
>> registered types and 0 default Kryo serializers
>> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
>> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
>> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
>> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
>> java.lang.RuntimeException: Couldn't retrieve standalone cluster
>>   at 
>> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
>>   at 
>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
>>   at 
>> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:944)
>>   at 
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
>>   at 
>> org.apache.flink.api.java.ScalaShellEnvironment.execute(ScalaShellEnvironment.java:81)
>>   at 
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
>>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
>>   ... 30 elided
>> Caused by: java.lang.NullPointerException: rest.address must be set
>>   at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>>   at 
>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
>>   at 
>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
>>   at 
>> org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
>>   at 
>> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
>>   ... 38 more
>
>
>
> --
> Best Regards
>
> Jeff Zhang


Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Jeff Zhang
It looks like you are running under standalone mode, what is
your command to start scala shell. ?

Craig Foster  于2020年3月18日周三 上午5:23写道:

> Hi:
> When I upgraded from Flink 1.9.1 to Flink 1.10.0 I can't execute
> programs at the Scala shell.
>
> It gives me an error that the REST address must be set. This looks
> like it comes from HA but I don't have HA configured for Flink and it
> was very hard to find this documented other than in the PR/JIRA in the
> history so don't have much context. Can someone point me to how to
> configure this properly? For reference, I put the example stacktrace
> below.
>
> scala> val text = benv.fromElements("To be, or not to be,--that is the
> question:--");
> text: org.apache.flink.api.scala.DataSet[String] =
> org.apache.flink.api.scala.DataSet@2396408a
>
> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")}.map {
> (_, 1) }.groupBy(0).sum(1);
> counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] =
> org.apache.flink.api.scala.AggregateDataSet@38bce2ed
>
> scala> counts.print()
> 20/03/17 21:15:34 INFO java.ExecutionEnvironment: The job has 0
> registered types and 0 default Kryo serializers
> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
> java.lang.RuntimeException: Couldn't retrieve standalone cluster
>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
>   at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:944)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
>   at
> org.apache.flink.api.java.ScalaShellEnvironment.execute(ScalaShellEnvironment.java:81)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
>   ... 30 elided
> Caused by: java.lang.NullPointerException: rest.address must be set
>   at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
>   at
> org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
>   ... 38 more
>


-- 
Best Regards

Jeff Zhang


Re: Flink on Kubernetes Vs Flink Natively on Kubernetes

2020-03-17 Thread Pankaj Chand
Thank you, Yang and Xintong!

Best,

Pankaj

On Mon, Mar 16, 2020, 9:27 PM Yang Wang  wrote:

> Hi Pankaj,
>
> Just like Xintong has said, the biggest difference of Flink on Kubernetes
> and native
> integration is dynamic resource allocation. Since the latter has en
> embedded K8s
> client and will communicate with K8s Api server directly to
> allocate/release JM/TM
> pods.
>
> Both for the two ways to run Flink on K8s, you do not need to reserve the
> whole
> cluster for Flink. Flink could run with other workloads(e.g. Spark,
> tensorflow, etc.).
> The K8s cluster could guarantee the isolation.
>
>
> Best,
> Yang
>
> Pankaj Chand  于2020年3月16日周一 下午5:51写道:
>
>> Hi Xintong,
>>
>> Thank you for the explanation!
>>
>> If I run Flink "natively" on Kubernetes, will I also be able to run Spark
>> on the same Kubernetes cluster, or will it make the Kubernetes cluster be
>> reserved for Flink only?
>>
>> Thank you!
>>
>> Pankaj
>>
>> On Mon, Mar 16, 2020 at 5:41 AM Xintong Song 
>> wrote:
>>
>>> Forgot to mention that "running Flink natively on Kubernetes" is newly
>>> introduced and is only available for Flink 1.10 and above.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Mon, Mar 16, 2020 at 5:40 PM Xintong Song 
>>> wrote:
>>>
 Hi Pankaj,

 "Running Flink on Kubernetes" refers to the old way that basically
 deploys a Flink standalone cluster on Kubernetes. We leverage scripts to
 run Flink Master and TaskManager processes inside Kubernetes container. In
 this way, Flink is not ware of whether it's running in containers or
 directly on physical machines, and will not interact with the Kubernetes
 Master. Flink Master reactively accept all registered TaskManagers, whose
 number is decided by the Kubernetes replica.

 "Running Flink natively on Kubernetes" refers deploy Flink as a
 Kubernetes Job. Flink Master will interact with Kubernetes Master, and
 actively requests for pods/containers, like on Yarn/Mesos.

 Thank you~

 Xintong Song



 On Mon, Mar 16, 2020 at 4:03 PM Pankaj Chand 
 wrote:

> Hi all,
>
> I want to run Flink, Spark and other processing engines on a single
> Kubernetes cluster.
>
> From the Flink documentation, I did not understand the difference
> between:
> (1) Running Flink on Kubernetes, Versus (2) Running Flink natively on
> Kubernetes.
>
> Could someone please explain the difference between the two, and when
> would you use which option?
>
> Thank you,
>
> Pankaj
>



Custom Exception Handling

2020-03-17 Thread Anil Alfons K
Hi Community,
I am reading data from Kafka. The FlinkKafkaConsumer reads data from it.
Then some application-specific logic in a process function. If I receive
any invalid data I throw a custom exception and it's handled in the process
function itself. This invalid data is taken out as side output. But the
problem is Flink tries to read the same invalid messages again and again
for a few times.

Can anyone let me know how can the error/exception handling be done without
the Flink job breaking?

The plan is to process all the events only once through the process
function without any retry.

Regards
Anil


Flink SQL, How can i set parallelism in clause of group by ?

2020-03-17 Thread forideal
Hello everyone


 I'm a Flink SQL user. Now i have a question.How can i set parallelism in 
clause of group by.
 For example
 SELECT 
T.user_id,
D.user_name
 (SELECT 
 user_id,
 MIN(processtime)
 from my_table 
group by user_id,TUMBLE(processtime, INTERVAL'15'SECOND)) AS T
 LEFTJOIN my_dim_table FOR SYSTEM_TIME ASOF T.proc_time AS D ON T.user_id = 
D.user_id
 
 If the partition of the topic is 3, then,the parallelism of the job is 3.
 In this example,there are three operator
 0ne is Source operator, parallelism is 3
 Two is GroupWindowAggregate operator,parallelism is 3
 Three is LookupJoin operator,parallelism is 3


  I want to change the parallelism of GroupWindowAggregate,but i can't.


Best wishes
forideal

Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread John Morrow
Hi Gordon,

That sounds good. My first thought was that if I have to break up the logic I'd 
end up with:

BroadcastFunction1 --> AsyncFunction --> BroadcastFunction2

...with Broadcast1 & BroadcastFunction2 needing the same broadcast state, and 
that state could change while an item is being processed through the chain. But 
I could leave a marker to do the call like you suggested and have a placeholder 
for the result and that might do the trick.

Thanks again for the suggestion! Below is a sudo-code example of how I think 
I'll be able to get it to work in case it's helpful for anyone else.

Cheers,
John.



Function:

processElement(item) { //BroadcastFunction

  if (broadcastState.checkInventoryLevel) {
  long inventoryLevel = http://get_the_inventory_level(item.id) // Zz
  if (item.inventory < X) {
ctx.output("reorder-outputTag", item)
  }
  }
...
  item.status = "checked";
  collect(item);
}


 ---> broken down in to functions A, B & C


FunctionA:

processElement(item) { //BroadcastFunction

  if (broadcastState.checkInventoryLevel) {
  collect(Tuple2(item, True))
  item.inventoryLevel = http://get_the_inventory(item.id)
  } else {
  collect(Tuple2(item, False))
  }


FunctionB:

asyncInvoke(Tuple2) { //AsyncFunction

  if (needsInventory)
  item.inventoryLevel = http://get_the_inventory(item.id)
  }
  collect(item);


FunctionC:

processElement(item) { //FlatMapFunction

  if (item.inventory != null && item.inventory < X) {
ctx.output("reorder-outputTag", item)
}
  item.status = "checked";
  collect(item);
}



From: Tzu-Li (Gordon) Tai 
Sent: Tuesday 17 March 2020 10:05
To: user@flink.apache.org 
Subject: Re: RichAsyncFunction + BroadcastProcessFunction

Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help me understand this Exception

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

The exception stack you posted simply means that the next operator in the
chain failed to process the output watermark.
There should be another exception, which would explain why some operator
was closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon

On Tue, Mar 17, 2020 at 11:27 PM aj  wrote:

> Hi,
> I am running a streaming job with generating watermark like this :
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks {
> @Override
> public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> LOGGER.info("timestamp", timestamp);
> return timestamp;
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> LOGGER.info("extractedTimestamp ", extractedTimestamp);
> return new Watermark(extractedTimestamp);
> }
> }
>
> Please help me understand what this exception means:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:216)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:189)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processElement(StreamOneInputProcessor.java:169)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:143)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:51)
> at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 137)
> at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 116)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:32)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:457)
> at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
> .advanceWatermark(InternalTimerServiceImpl.java:276)
> at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
> .advanceWatermark(InternalTimeServiceManager.java:128)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:784)
> at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:213)
> ... 10 more
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


Help me understand this Exception

2020-03-17 Thread aj
Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks {
@Override
public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record,
long extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}

Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at org.apache.flink.streaming.runtime.io.
StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
StreamOneInputProcessor.java:216)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve
.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processElement(StreamOneInputProcessor.java:169)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:51)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalIterableProcessWindowFunction.process(
InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalIterableProcessWindowFunction.process(
InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.emitWindowContents(WindowOperator.java:549)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.onEventTime(WindowOperator.java:457)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
.advanceWatermark(InternalTimerServiceImpl.java:276)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
.advanceWatermark(InternalTimeServiceManager.java:128)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.processWatermark(AbstractStreamOperator.java:784)
at org.apache.flink.streaming.runtime.io.
StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
StreamOneInputProcessor.java:213)
... 10 more

-- 
Thanks & Regards,
Anuj Jain






Re: Hadoop user jar for flink 1.9 plus

2020-03-17 Thread Chesnay Schepler
You can download flink-shaded-hadoop from the downloads page: 
https://flink.apache.org/downloads.html#additional-components


On 17/03/2020 15:56, Vishal Santoshi wrote:
We have been on flink 1.8.x on production and were planning to go to 
flink 1.9 or above. We have always used hadoop uber jar from 
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2-uber but 
it seems they go up to 1.8.3 and their distribution ends 2019. How do 
or where do we get these jars for the later version of flink ?


Regards





Re: Communication between two queries

2020-03-17 Thread Mikael Gordani
No worries and great idea!
I will play around with it and see what I manage to do.
Cheers!

Den tis 17 mars 2020 kl 15:59 skrev Piotr Nowojski :

> Ops, sorry there was a misleading typo/auto correction in my previous
> e-mail. Second sentence should have been:
>
> > First of all you would have to use event time semantic for consistent
> results
>
> Piotrek
>
> On 17 Mar 2020, at 14:43, Piotr Nowojski  wrote:
>
> Hi,
>
> Yes, you are looking in the right directions with the watermarks.
>
> First of all you would have to use event time semantic for constant
> results. With processing time everything would be simpler, but it would be
> more difficult to reason about the results (your choice). Secondly, you
> would have to hook up the logic of enabling query1/query2 to the event
> time/watermarks. Thirdly, you need to somehow to sync the input switching
> with the windows boundaries. On top of that, watermarks express lower bound
> of even time that you can expect. However, in order to guarantee
> consistency of the windows, you would like to control the upper bound. For
> example:
>
> 1. If you want to enable Query2, you would need to check what’s the
> largest/latest event time that was processed by the input splitter, lets
> say that’s TS1
> 2. That means, records with event time < TS1 have already been processed
> by Query1, starting some windows
> 3. The earliest point for which you could enable Query2, is thus TS1 + 1.
> 4. You would have to adjust Query2 start time, by start of the next time
> window, let’s say that would be TS2 = TS1 + 1 + start of next window
> 5. Input splitter now must keep sending records with event time < TS2 to
> Query1, but already should redirect records with event time >= TS2 to
> Query2.
> 6. Once watermark for the input splitter advances past TS2, that’s when it
> can finally stop sending records to Query1 and query1 logic could be
> considered “completed”.
>
> So Query1 would be responsible for all of the data before TS2, and Query2
> after TS2.
>
> Alternatively, your input splitter could also buffer some records, so that
> you could enable Query2 faster, by re-sending the buffered records. But in
> that case, both Query1 and Query2 would be responsible for some portion of
> the data.
>
> Piotrek
>
> On 17 Mar 2020, at 10:35, Mikael Gordani  wrote:
>
> Hi Piotr!
>
> Continuing with my scenario, since both of the queries will share the same
> sink, I've realized that some issues will appear when I switch queries.
> Especially with regards to stateful operators, e.g aggregation.
>
> Let me provide an example:
> So, let say that both of the queries ingest a sequence of integers, and it
> will perform the average of these integers over some time.
> E.g say that *query1* ingest the sequence *1,2,3,4 *
> The windows for *query1* will be *[1,2,3] [2,3,4] [3,4]*.
>
> If I'm later on "activating" *query2*, I need to have both of the queries
> allowing tuples for a while, in order to allow the aggregation to finish in
> *query1* before denying it input.
> But, there is a possibility that *query2* might receive the tuples *3,4*,
> which will result in the window: *[3][3,4][3,4]*
> Later on, the output of the respective queries will be:
> Query 1: 3, *4.5*, 3.5
> Query2 : 3, *3.5*, 3.5
>
> As one can see, the second output will be different.
> I'm thinking of using watermarks somehow to make sure that both queries
> has processed the same amount of data before writing to the sink, but I'm a
> bit unsure on how to do it.
> Do you have any suggestions or thoughts?
> Cheers,
>
> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski :
>
>> Hi,
>>
>> Let us know if something doesn’t work :)
>>
>> Piotrek
>>
>> On 16 Mar 2020, at 08:42, Mikael Gordani  wrote:
>>
>> Hi,
>> I'll try it out =)
>>
>> Cheers!
>>
>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski :
>>
>>> Hi,
>>>
>>> In that case you could try to implement your `FilterFunction` as two
>>> input operator, with broadcast control input, that would be setting the
>>> `global_var`. Broadcast control input can be originating from some source,
>>> or from some operator.
>>>
>>> Piotrek
>>>
>>> On 13 Mar 2020, at 15:47, Mikael Gordani  wrote:
>>>
>>> Hi Piotr!
>>> Thanks for your response, I'll try to explain what I'm trying to achieve
>>> in more detail:
>>>
>>> Essentially, If I've two queries, in which has the same operators and
>>> runs in the same task, I would want to figure out some way of controlling
>>> the ingestion from *a source* to the respective queries in such a way
>>> that only one of the queries receive data, based on a condition.
>>> For more context, the second query (query2), is equipped with
>>> instrumented operators, which are standard operators extended with some
>>> extra functionality, in my case, they enrich the tuples with meta-data.
>>>
>>> Source --> *Filter1* ---> rest of query1
>>>|
>>>v
>>>*Filter2* ---> rest of query2
>>>
>>> By using *filters* prior to the queries, they allow records 

Re: Flink YARN app terminated before the client receives the result

2020-03-17 Thread tison
JIRA created as https://jira.apache.org/jira/browse/FLINK-16637

Best,
tison.


Till Rohrmann  于2020年3月17日周二 下午5:57写道:

>  @Tison could you create an issue to track the problem. Please also link
> the uploaded log file for further debugging.
>
> I think the reason why it worked in Flink 1.9 could have been that we had
> a async callback in the longer chain which broke the flow of execution and
> allowed to send the response. This is no longer the case. As an easy fix
> one could change thenAccept into thenAcceptAsync in the
> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
> think about allowing not only StatusHandler to close asynchronously. At the
> moment we say that all other handler shut down immediately (see
> AbstractHandler#closeHandlerAsync). But the problem with this change would
> be that all handler would become stateful because they would need to
> remember whether a request is currently ongoing or not.
>
> Cheers,
> Till
>
> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike 
> wrote:
>
>> Hi Tison & Till and all,
>>
>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
>> YARN.
>>
>> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
>> job with a savepoint *sometimes throws exceptions to the client side due
>> to
>> early shutdown of the server, even though the savepoint was successfully
>> completed by reviewing the log, however when using the newly introduced
>> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy
>> now.
>>
>> Best,
>> Weike
>>
>> On Tue, Mar 17, 2020 at 10:17 AM tison  wrote:
>>
>> > edit: previously after the cancellation we have a longer call chain to
>> > #jobReachedGloballyTerminalState which does the archive job & JM
>> graceful
>> > showdown, which might take some time so that ...
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > tison  于2020年3月17日周二 上午10:13写道:
>> >
>> >> Hi Weike & Till,
>> >>
>> >> I agree with Till and it is also the analysis from my side. However, it
>> >> seems even if we don't have FLINK-15116, it is still possible that we
>> >> complete the cancel future but the cluster got shutdown before it
>> properly
>> >> delivered the response.
>> >>
>> >> There is one thing strange that this behavior almost reproducible, it
>> >> should be a possible order but not always. Maybe previous we have to
>> >> firstly cancel the job which has a long call chain so that it happens
>> we
>> >> have enough time to delivered the response.
>> >>
>> >> But the resolution looks like we introduce some
>> >> synchronization/finalization logics that clear these outstanding future
>> >> with best effort before the cluster(RestServer) down.
>> >>
>> >> Best,
>> >> tison.
>> >>
>> >>
>> >> Till Rohrmann  于2020年3月17日周二 上午4:12写道:
>> >>
>> >>> Hi Weike,
>> >>>
>> >>> could you share the complete logs with us? Attachments are being
>> >>> filtered out by the Apache mail server but it works if you upload the
>> logs
>> >>> somewhere (e.g. https://gist.github.com/) and then share the link
>> with
>> >>> us. Ideally you run the cluster with DEBUG log settings.
>> >>>
>> >>> I assume that you are running Flink 1.10, right?
>> >>>
>> >>> My suspicion is that this behaviour has been introduced with
>> FLINK-15116
>> >>> [1]. It looks as if we complete the shutdown future in
>> >>> MiniDispatcher#cancelJob before we return the response to the
>> >>> RestClusterClient. My guess is that this triggers the shutdown of the
>> >>> RestServer which then is not able to serve the response to the
>> client. I'm
>> >>> pulling in Aljoscha and Tison who introduced this change. They might
>> be
>> >>> able to verify my theory and propose a solution for it.
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>> >>>
>> >>> Cheers,
>> >>> Till
>> >>>
>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike 
>> >>> wrote:
>> >>>
>>  Hi Yangze and all,
>> 
>>  I have tried numerous times, and this behavior persists.
>> 
>>  Below is the tail log of taskmanager.log:
>> 
>>  2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>   org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>> Free slot
>>  TaskSlot(index:0, state:ACTIVE, resource profile:
>>  ResourceProfile{cpuCores=1., taskHeapMemory=1.503gb
>>  (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>>  (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>  allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>  d0a674795be98bd2574d9ea3286801cb).
>>  2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>   org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>>  d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>  

Re: Communication between two queries

2020-03-17 Thread Piotr Nowojski
Ops, sorry there was a misleading typo/auto correction in my previous e-mail. 
Second sentence should have been:

> First of all you would have to use event time semantic for consistent results

Piotrek

> On 17 Mar 2020, at 14:43, Piotr Nowojski  wrote:
> 
> Hi,
> 
> Yes, you are looking in the right directions with the watermarks. 
> 
> First of all you would have to use event time semantic for constant results. 
> With processing time everything would be simpler, but it would be more 
> difficult to reason about the results (your choice). Secondly, you would have 
> to hook up the logic of enabling query1/query2 to the event time/watermarks. 
> Thirdly, you need to somehow to sync the input switching with the windows 
> boundaries. On top of that, watermarks express lower bound of even time that 
> you can expect. However, in order to guarantee consistency of the windows, 
> you would like to control the upper bound. For example:
> 
> 1. If you want to enable Query2, you would need to check what’s the 
> largest/latest event time that was processed by the input splitter, lets say 
> that’s TS1 
> 2. That means, records with event time < TS1 have already been processed by 
> Query1, starting some windows
> 3. The earliest point for which you could enable Query2, is thus TS1 + 1.
> 4. You would have to adjust Query2 start time, by start of the next time 
> window, let’s say that would be TS2 = TS1 + 1 + start of next window
> 5. Input splitter now must keep sending records with event time < TS2 to 
> Query1, but already should redirect records with event time >= TS2 to Query2.
> 6. Once watermark for the input splitter advances past TS2, that’s when it 
> can finally stop sending records to Query1 and query1 logic could be 
> considered “completed”.  
> 
> So Query1 would be responsible for all of the data before TS2, and Query2 
> after TS2.
> 
> Alternatively, your input splitter could also buffer some records, so that 
> you could enable Query2 faster, by re-sending the buffered records. But in 
> that case, both Query1 and Query2 would be responsible for some portion of 
> the data.
> 
> Piotrek
> 
>> On 17 Mar 2020, at 10:35, Mikael Gordani > > wrote:
>> 
>> Hi Piotr!
>> 
>> Continuing with my scenario, since both of the queries will share the same 
>> sink, I've realized that some issues will appear when I switch queries. 
>> Especially with regards to stateful operators, e.g aggregation.
>> 
>> Let me provide an example:
>> So, let say that both of the queries ingest a sequence of integers, and it 
>> will perform the average of these integers over some time.
>> E.g say that query1 ingest the sequence 1,2,3,4 
>> The windows for query1 will be [1,2,3] [2,3,4] [3,4]. 
>> 
>> If I'm later on "activating" query2, I need to have both of the queries 
>> allowing tuples for a while, in order to allow the aggregation to finish in 
>> query1 before denying it input.
>> But, there is a possibility that query2 might receive the tuples 3,4, which 
>> will result in the window: [3][3,4][3,4]
>> Later on, the output of the respective queries will be:
>> Query 1: 3, 4.5, 3.5
>> Query2 : 3, 3.5, 3.5
>> 
>> As one can see, the second output will be different. 
>> I'm thinking of using watermarks somehow to make sure that both queries has 
>> processed the same amount of data before writing to the sink, but I'm a bit 
>> unsure on how to do it.
>> Do you have any suggestions or thoughts?
>> Cheers,
>> 
>> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski > >:
>> Hi,
>> 
>> Let us know if something doesn’t work :)
>> 
>> Piotrek
>> 
>>> On 16 Mar 2020, at 08:42, Mikael Gordani >> > wrote:
>>> 
>>> Hi,
>>> I'll try it out =) 
>>> 
>>> Cheers!
>>> 
>>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski >> >:
>>> Hi,
>>> 
>>> In that case you could try to implement your `FilterFunction` as two input 
>>> operator, with broadcast control input, that would be setting the 
>>> `global_var`. Broadcast control input can be originating from some source, 
>>> or from some operator.
>>> 
>>> Piotrek
>>> 
 On 13 Mar 2020, at 15:47, Mikael Gordani >>> > wrote:
 
 Hi Piotr!
 Thanks for your response, I'll try to explain what I'm trying to achieve 
 in more detail:
 
 Essentially, If I've two queries, in which has the same operators and runs 
 in the same task, I would want to figure out some way of controlling the 
 ingestion from a source to the respective queries in such a way that only 
 one of the queries receive data, based on a condition. 
 For more context, the second query (query2), is equipped with instrumented 
 operators, which are standard operators extended with some extra 
 functionality, in my case, they enrich the tuples with meta-data.
 
 Source --> Filter1 ---> rest of query1
|
v

Hadoop user jar for flink 1.9 plus

2020-03-17 Thread Vishal Santoshi
We have been on flink 1.8.x on production and were planning to go to flink
1.9 or above. We have always used hadoop uber jar from
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2-uber
but
it seems they go up to 1.8.3 and their distribution ends 2019.  How do or
where do we get these jars for the later version of flink ?

Regards


Re: Apache Airflow - Question about checkpointing and re-run a job

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

I believe that the title of this email thread was a typo, and should be
"Apache Flink - Question about checkpointing and re-run a job."
I assume this because the contents of the previous conversations seem to be
purely about Flink.

Otherwise, as far as I know, there doesn't seem to be any publicly available
Airflow operators for Flink right now.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

As David already explained, they are similar in that you may output zero to
multiple records for both process and flatMap functions.

However, ProcessFunctions also expose to the user much more powerful
functionality, such as registering timers, outputting to side outputs, etc.

Cheers,
Gordon




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Communication between two queries

2020-03-17 Thread Piotr Nowojski
Hi,

Yes, you are looking in the right directions with the watermarks. 

First of all you would have to use event time semantic for constant results. 
With processing time everything would be simpler, but it would be more 
difficult to reason about the results (your choice). Secondly, you would have 
to hook up the logic of enabling query1/query2 to the event time/watermarks. 
Thirdly, you need to somehow to sync the input switching with the windows 
boundaries. On top of that, watermarks express lower bound of even time that 
you can expect. However, in order to guarantee consistency of the windows, you 
would like to control the upper bound. For example:

1. If you want to enable Query2, you would need to check what’s the 
largest/latest event time that was processed by the input splitter, lets say 
that’s TS1 
2. That means, records with event time < TS1 have already been processed by 
Query1, starting some windows
3. The earliest point for which you could enable Query2, is thus TS1 + 1.
4. You would have to adjust Query2 start time, by start of the next time 
window, let’s say that would be TS2 = TS1 + 1 + start of next window
5. Input splitter now must keep sending records with event time < TS2 to 
Query1, but already should redirect records with event time >= TS2 to Query2.
6. Once watermark for the input splitter advances past TS2, that’s when it can 
finally stop sending records to Query1 and query1 logic could be considered 
“completed”.  

So Query1 would be responsible for all of the data before TS2, and Query2 after 
TS2.

Alternatively, your input splitter could also buffer some records, so that you 
could enable Query2 faster, by re-sending the buffered records. But in that 
case, both Query1 and Query2 would be responsible for some portion of the data.

Piotrek

> On 17 Mar 2020, at 10:35, Mikael Gordani  wrote:
> 
> Hi Piotr!
> 
> Continuing with my scenario, since both of the queries will share the same 
> sink, I've realized that some issues will appear when I switch queries. 
> Especially with regards to stateful operators, e.g aggregation.
> 
> Let me provide an example:
> So, let say that both of the queries ingest a sequence of integers, and it 
> will perform the average of these integers over some time.
> E.g say that query1 ingest the sequence 1,2,3,4 
> The windows for query1 will be [1,2,3] [2,3,4] [3,4]. 
> 
> If I'm later on "activating" query2, I need to have both of the queries 
> allowing tuples for a while, in order to allow the aggregation to finish in 
> query1 before denying it input.
> But, there is a possibility that query2 might receive the tuples 3,4, which 
> will result in the window: [3][3,4][3,4]
> Later on, the output of the respective queries will be:
> Query 1: 3, 4.5, 3.5
> Query2 : 3, 3.5, 3.5
> 
> As one can see, the second output will be different. 
> I'm thinking of using watermarks somehow to make sure that both queries has 
> processed the same amount of data before writing to the sink, but I'm a bit 
> unsure on how to do it.
> Do you have any suggestions or thoughts?
> Cheers,
> 
> Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski  >:
> Hi,
> 
> Let us know if something doesn’t work :)
> 
> Piotrek
> 
>> On 16 Mar 2020, at 08:42, Mikael Gordani > > wrote:
>> 
>> Hi,
>> I'll try it out =) 
>> 
>> Cheers!
>> 
>> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski > >:
>> Hi,
>> 
>> In that case you could try to implement your `FilterFunction` as two input 
>> operator, with broadcast control input, that would be setting the 
>> `global_var`. Broadcast control input can be originating from some source, 
>> or from some operator.
>> 
>> Piotrek
>> 
>>> On 13 Mar 2020, at 15:47, Mikael Gordani >> > wrote:
>>> 
>>> Hi Piotr!
>>> Thanks for your response, I'll try to explain what I'm trying to achieve in 
>>> more detail:
>>> 
>>> Essentially, If I've two queries, in which has the same operators and runs 
>>> in the same task, I would want to figure out some way of controlling the 
>>> ingestion from a source to the respective queries in such a way that only 
>>> one of the queries receive data, based on a condition. 
>>> For more context, the second query (query2), is equipped with instrumented 
>>> operators, which are standard operators extended with some extra 
>>> functionality, in my case, they enrich the tuples with meta-data.
>>> 
>>> Source --> Filter1 ---> rest of query1
>>>|
>>>v
>>>Filter2 ---> rest of query2
>>> 
>>> By using filters prior to the queries, they allow records to pass depending 
>>> on a condition, let's say a global boolean variable (which is initially set 
>>> to false).
>>> If it's set to true, Filter1 will accept every record and Filter2 will 
>>> disregard every record.
>>> If it's set to false, Filter2 will accept every record and Filter1 will 
>>> disregard every record.
>>> So the filter 

Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread kant kodali
Got it! and thanks a lot for that. So there is no difference between
flatmap and process then?

On Tue, Mar 17, 2020 at 5:29 AM David Anderson  wrote:

> Map applies a MapFunction (or a RichMapFunction) to a DataStream and does
> a one-to-one transformation of the stream elements.
>
> Process applies a ProcessFunction, which can produce zero, one, or many
> events in response to each event. And when used on a keyed stream, a
> KeyedProcessFunction can use Timers to defer actions until later, based
> either on watermarks or the time-of-day clock. A ProcessFunction can also
> have side outputs.
>
> Both RichMapFunctions and KeyedProcessFunctions can use keyed state.
>
> Process is strictly more powerful -- there's nothing you can do with map
> that you couldn't do with process instead. The same is true for flatmap,
> which is similar to map, but with a Collector that can be used to
> emit zero, one, or many events in response to each event, just like a
> process function.
>
> David
>
>
> On Tue, Mar 17, 2020 at 11:50 AM kant kodali  wrote:
>
>> what is the difference between map vs process on a datastream? they look
>> very similar.
>>
>> Thanks!
>>
>>


Re: Issues with Watermark generation after join

2020-03-17 Thread Dominik Wosiński
Hey sure,
the original Temporal Table SQL is:

|SELECT e.*, f.level as level FROM
| enablers AS e,
| LATERAL TABLE (Detectors(e.timestamp)) AS f
| WHERE e.id= f.id
|""

And the previous SQL query to join A is something like :

SELECT *
| FROM A te,
| B s
| WHERE s.id = te.id AND s.level = te.level AND s.timestamp = te.timestamp


Also, if I replace the SQL to Join A with BroadcastProcessFunction this
works like a charm, everything is calculated correctly. Even if I don't
change the parallelism.

I have noticed one more weird behavior, after the temporal table Join I
have a windowing function to process the data. Now I have two options, in
TTF I can select the rowtime with type Timestamp and assign it to field in
output class, this automatically passes the Timestamp over so I don't need
to assign it again. But I could also select just a Long field that is not
marked as rowtime (even if they actually have the same value but this field
was not marked with *.rowtime* on declaration) and then I will need to
assign the timestamps and watermarks again, since Flink doesn't now what is
the timestamp. Now, the former solution works like a charm, but for the
latter one there is actually no output visible from the windowing function.
My expectation is that both solutions should work exactly the same and pass
the timestamps in the same manner, but apparently they are don't.

Best Regards,
Dom.

>


Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread David Anderson
Map applies a MapFunction (or a RichMapFunction) to a DataStream and does a
one-to-one transformation of the stream elements.

Process applies a ProcessFunction, which can produce zero, one, or many
events in response to each event. And when used on a keyed stream, a
KeyedProcessFunction can use Timers to defer actions until later, based
either on watermarks or the time-of-day clock. A ProcessFunction can also
have side outputs.

Both RichMapFunctions and KeyedProcessFunctions can use keyed state.

Process is strictly more powerful -- there's nothing you can do with map
that you couldn't do with process instead. The same is true for flatmap,
which is similar to map, but with a Collector that can be used to
emit zero, one, or many events in response to each event, just like a
process function.

David


On Tue, Mar 17, 2020 at 11:50 AM kant kodali  wrote:

> what is the difference between map vs process on a datastream? they look
> very similar.
>
> Thanks!
>
>


Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-17 Thread LakeShen
Thank you, I will do that.

jinhai wang  于2020年3月17日周二 下午5:58写道:

> Hi LakeShen
>
> You also must assign IDs to all operators of an application. Otherwise,
> you may not be able to recover from checkpoint
>
> Doc:
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state
>
>
> Best Regards
>
> jinhai...@gmail.com
>
> 2020年3月17日 下午5:51,Till Rohrmann  写道:
>
> Let us know if you should run into any problems. The state processor API
> is still young and could benefit from as much feedback as possible.
>
> Cheers,
> Till
>
> On Tue, Mar 17, 2020 at 2:57 AM LakeShen 
> wrote:
>
>> Wow,this feature is look so good,I will see this feature.
>> Thank you to reply me , Till.
>>
>> Best Regards,
>> LakeShen
>>
>> Till Rohrmann  于2020年3月17日周二 上午4:17写道:
>>
>>> If you want to change the max parallelism then you need to take a
>>> savepoint and use Flink's state processor API [1] to rewrite the max
>>> parallelism by creating a new savepoint from the old one.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sat, Mar 14, 2020 at 4:07 AM LakeShen 
>>> wrote:
>>>
 Hi Eleanore , if you resume from savepoint , you can't change the flink
 operator's max parallelism .

 Eleanore Jin  于2020年3月14日周六 上午12:51写道:

 > Hi Piotr,
 > Does this also apply to savepoint? (meaning the max parallelism
 should not
 > change for job resume from savepoint?)
 >
 > Thanks a lot!
 > Eleanore
 >
 > On Fri, Mar 13, 2020 at 6:33 AM Piotr Nowojski 
 > wrote:
 >
 > > Hi,
 > >
 > > Yes, you can change the parallelism. One thing that you can not
 change is
 > > “max parallelism”.
 > >
 > > Piotrek
 > >
 > > > On 13 Mar 2020, at 04:34, Sivaprasanna >>> >
 > > wrote:
 > > >
 > > > I think you can modify the operator’s parallelism. It is only if
 you
 > > have set maxParallelism, and while restoring from a checkpoint, you
 > > shouldn’t modify the maxParallelism. Otherwise, I believe the state
 will
 > be
 > > lost.
 > > >
 > > > -
 > > > Sivaprasanna
 > > >
 > > > On Fri, 13 Mar 2020 at 9:01 AM, LakeShen <
 shenleifight...@gmail.com
 > > > wrote:
 > > > Hi community,
 > > >   I have a question is that I cancel the flink task and
 retain the
 > > checkpoint dir, then restore from the checkpoint dir ,can I change
 the
 > > flink operator's parallelism,in my thoughts, I think I can't change
 the
 > > flink operator's parallelism,but I am not sure.
 > > >  Thanks to your reply.
 > > >
 > > > Best wishes,
 > > > LakeShen
 > >
 > >
 >

>>>
>


Re: Apache Airflow - Question about checkpointing and re-run a job

2020-03-17 Thread kant kodali
Does Airflow has a Flink Operator? I am not seeing it? Can you please point
me?

On Mon, Nov 18, 2019 at 3:10 AM M Singh  wrote:

> Thanks Congxian for your answer and reference.  Mans
>
> On Sunday, November 17, 2019, 08:59:16 PM EST, Congxian Qiu <
> qcx978132...@gmail.com> wrote:
>
>
> Hi
> Yes, checkpoint data locates under jobid dir. you can try to restore from
> the retained checkpoint[1]
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> Best,
> Congxian
>
>
> M Singh  于2019年11月18日周一 上午2:54写道:
>
> Folks - Please let me know if you have any advice on this question.  Thanks
>
> On Saturday, November 16, 2019, 02:39:18 PM EST, M Singh <
> mans2si...@yahoo.com> wrote:
>
>
> Hi:
>
> I have a Flink job and sometimes I need to cancel and re run it.  From
> what I understand the checkpoints for a job are saved under the job id
> directory at the checkpoint location. If I run the same job again, it will
> get a new job id and the checkpoint saved from the previous run job (which
> is saved under the previous job's id dir) will not be used for this new
> run. Is that a correct understanding ?  If I need to re-run the job from
> the previous checkpoint - is there any way to do that automatically without
> using a savepoint ?
>
> Also, I believe the internal job restarts do not change the job id so in
> those cases where the job restarts will pick the state from the saved
> checkpoint.  Is my understanding correct ?
>
> Thanks
>
> Mans
>
>


RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread John Morrow
Hi Flink Users,

I have a BroadcastProcessFunction and in the processElement method I sometimes 
need to do some http requests, depending on the broadcast state.

Because I'm doing http requests, I'd prefer the function to be async, like 
RichAsyncFunction.asyncInvoke(), but RichAsyncFunction doesn't support 
broadcast data.

Is there any way to combine the functionality of a RichAsyncFunction + a 
BroadcastProcessFunction?

Thanks!
John.


what is the difference between map vs process on a datastream?

2020-03-17 Thread kant kodali
what is the difference between map vs process on a datastream? they look
very similar.

Thanks!


Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-17 Thread Becket Qin
Actually it might be better to create another ticket, FLINK-8093 was mainly
complaining about the JMX bean collision when there are multiple tasks
running in the same TM.

Jiangjie (Becket) Qin

On Tue, Mar 17, 2020 at 6:33 PM Becket Qin  wrote:

> Hi Till,
>
> It looks FLINK-8093  reports
> the same issue, although the reported information is not exactly correct,
> as this should not cause the producer to fail. I'll take care of the ticket.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann 
> wrote:
>
>> @Becket do we already have a JIRA ticket to track this effort?
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin  wrote:
>>
>>> Hi Sidney,
>>>
>>> The WARN logging you saw was from the AbstractPartitionDiscoverer which
>>> is created by FlinkKafkaConsumer itself. It has an internal consumer which
>>> shares the client.id of the actual consumer fetching data. This is a
>>> bug that we should fix.
>>>
>>> As Rong said, this won't affect the normal operation of the consumer. It
>>> is just an AppInfo MBean for reporting some information. There might be
>>> some slight impact on the accuracy of the consumer metrics, but should be
>>> almost ignorable because the partition discoverer is quite inactive
>>> compared with the actual consumer.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong  wrote:
>>>
 We also had seen this issue before running Flink apps in a shared
 cluster environment.

 Basically, Kafka is trying to register a JMX MBean[1] for application
 monitoring.
 This is only a WARN suggesting that you are registering more than one
 MBean with the same client id "consumer-1", it should not affect your
 normal application behavior.

 This is most likely occurring if you have more than one Kafka consumer
 within the same JVM, are you using a session cluster[2]? can you share more
 on your application configuration including parallelism and slot configs?
 Also based on the log, you are not configuring the "client.id"
 correctly. which config key are you using? could you also share your fill
 Kafka properties map?


 --
 Rong

 [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session

 On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
 sidney.fei...@startapp.com> wrote:

> Hey,
> I've been using Flink for a while now without any problems when
> running apps with a FlinkKafkaConsumer.
> All my apps have the same overall logic (consume from kafka ->
> transform event -> write to file) and the only way they differ from each
> other is the topic they read (remaining kafka config remains identical) 
> and
> the way they transform the event.
> But suddenly, I've been starting to get the following error:
>
>
> 2020-03-15 12:13:56,911 WARN
>  org.apache.kafka.common.utils.AppInfoParser   - Error
> registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=consumer-1
>at
> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>
>at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>
>at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>
>at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>
>at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>
>at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>
>at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
>
>at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
>
>at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
>
>at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>
>at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>
>at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>
>at
> 

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-17 Thread Becket Qin
Hi Till,

It looks FLINK-8093  reports
the same issue, although the reported information is not exactly correct,
as this should not cause the producer to fail. I'll take care of the ticket.

Thanks,

Jiangjie (Becket) Qin

On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann  wrote:

> @Becket do we already have a JIRA ticket to track this effort?
>
> Cheers,
> Till
>
> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin  wrote:
>
>> Hi Sidney,
>>
>> The WARN logging you saw was from the AbstractPartitionDiscoverer which
>> is created by FlinkKafkaConsumer itself. It has an internal consumer which
>> shares the client.id of the actual consumer fetching data. This is a bug
>> that we should fix.
>>
>> As Rong said, this won't affect the normal operation of the consumer. It
>> is just an AppInfo MBean for reporting some information. There might be
>> some slight impact on the accuracy of the consumer metrics, but should be
>> almost ignorable because the partition discoverer is quite inactive
>> compared with the actual consumer.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong  wrote:
>>
>>> We also had seen this issue before running Flink apps in a shared
>>> cluster environment.
>>>
>>> Basically, Kafka is trying to register a JMX MBean[1] for application
>>> monitoring.
>>> This is only a WARN suggesting that you are registering more than one
>>> MBean with the same client id "consumer-1", it should not affect your
>>> normal application behavior.
>>>
>>> This is most likely occurring if you have more than one Kafka consumer
>>> within the same JVM, are you using a session cluster[2]? can you share more
>>> on your application configuration including parallelism and slot configs?
>>> Also based on the log, you are not configuring the "client.id"
>>> correctly. which config key are you using? could you also share your fill
>>> Kafka properties map?
>>>
>>>
>>> --
>>> Rong
>>>
>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>>
>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
>>> sidney.fei...@startapp.com> wrote:
>>>
 Hey,
 I've been using Flink for a while now without any problems when running
 apps with a FlinkKafkaConsumer.
 All my apps have the same overall logic (consume from kafka ->
 transform event -> write to file) and the only way they differ from each
 other is the topic they read (remaining kafka config remains identical) and
 the way they transform the event.
 But suddenly, I've been starting to get the following error:


 2020-03-15 12:13:56,911 WARN
  org.apache.kafka.common.utils.AppInfoParser   - Error
 registering AppInfo mbean
 javax.management.InstanceAlreadyExistsException:
 kafka.consumer:type=app-info,id=consumer-1
at
 com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at
 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)

at
 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)

at
 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)

at
 com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)

at
 com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

at
 org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)

at
 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)

at
 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)

at
 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)

at
 org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)

at
 org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)

at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)

at
 org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at
 org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)

at
 

Re: AfterMatchSkipStrategy for timed out patterns

2020-03-17 Thread Till Rohrmann
Hi Dominik,

at the moment the AfterMatchSkipStrategy are only applied to fully matching
sequences. In case of timeouts, the AfterMatchSkipStrategy will be ignored
because technically it is not a match.

Cheers,
Till

On Mon, Mar 16, 2020 at 5:39 PM Dominik Wosiński  wrote:

>
> Hey all,
>
> I was wondering whether for CEP the *AfterMatchSkipStrategy *is applied
> during matching or if simply the results are removed after the match. The
> question is the result of the experiments I was doing with CEP. Say I have
> the readings from some sensor and I want to detect events over some
> threshold. So I have something like below:
>
> Pattern.begin[AccelVector]("beginning", 
> AfterMatchSkipStrategy.skipPastLastEvent())
>   .where(_.data() < Threshold)
>   .optional
>   .followedBy(EventPatternName)
>   .where(event => event.data() >= Threshold)
>   .oneOrMore
>   .greedy
>   .consecutive()
>   .followedBy("end")
>   .where(_.data() < Threshold)
>   .oneOrMore
>   .within(Time.minutes(1))
>
>
> The thing is that sometimes sensors may stop sending data or the data is
> lost so I would like to emit events that have technically timed out. I have
> created a PatternProcessFunction that simply gets events that have timed
> out and check for *EventPatternName* part.
>
> It works fine, but I have noticed weird behavior that the events that get
> passed to the *processTimedOutMatch *are repeated as if there was no
> *AfterMatchSkipStrategy.*
>
> So, for example say the Threshold=200, and I have the following events for
> one of the sensors:
> Event1 (timestamp= 1, data = 10)
> Event2 (timestamp= 2, data = 250)
> Event3 (timestamp= 3, data = 300)
> Event4 (timestamp= 4, data = 350)
> Event5 (timestamp= 5, data = 400)
> Event6 (timestamp= 6, data = 450)
>
> After that, this sensor stops sending data but others are sending data so
> the watermark is progressing - this obviously causes timeout of the
> pattern. And the issue I have is the fact that  *processTimedOutMatch* gets
> called multiple times, first for the whole pattern Event1 to Event6 and
> each call just skips one event so next, I have Event2 to Event 6, Event3 to
> Event6 up to just Event6.
>
> My understanding is that *AfterMatchSkipStrategy *should wipe out those
> partial matches or does it work differently for timed out matches?
>
> Thanks in advance,
> Best Regards,
> Dom.
>


Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-17 Thread Till Rohrmann
I agree with Piotr that we need some type of checkpoint which supports
rescaling. Otherwise, the reactive mode and auto-scaling will only work if
the system has taken a savepoint which by definition should only be done by
the user.

Cheers,
Till

On Mon, Mar 16, 2020 at 8:39 AM Piotr Nowojski  wrote:

> Hi Seth,
>
> > Currently, all rescaling operations technically work with checkpoints.
> That is purely by chance that the implementation supports that, and the
> line is because the community is not committed to maintaining that
> functionality
>
> Are you sure that’s the case? Support for rescaling from checkpoint is as
> far as I know, something that we want/need to have:
> - if your cluster has just lost a node due to some hardware failure,
> without downscaling support your job will not be able to recover
> - future planned life rescaling efforts
>
> Also this [1] seems to contradict your statement?
>
> Lack of support for rescaling for unaligned checkpoints will be hopefully
> a temporarily limitation of the first version and it’s on our roadmap to
> solve this in the future.
>
> Piotrek
>
> [1]
> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html#rescaling-stateful-stream-processing-jobs
>
> On 13 Mar 2020, at 17:44, Seth Wiesman  wrote:
>
> Hi Aaron,
>
> Currently, all rescaling operations technically work with checkpoints.
> That is purely by chance that the implementation supports that, and the
> line is because the community is not committed to maintaining that
> functionality. As we add cases, such as unaligned checkpoints, which
> actually prevent rescaling the documentation will be updated accordingly.
> FLIP-47 has more to do with consolidating terminology and how actions are
> triggered and are not particularly relevant to the discussion of rescaling
> jobs.
>
> On Fri, Mar 13, 2020 at 11:39 AM Aaron Levin 
> wrote:
>
>> Hi Piotr,
>>
>> Thanks for your response! I understand that checkpoints and savepoints
>> may be diverging (for unaligned checkpoints) but parts also seem to be
>> converging per FLIP-47[0]. Specifically, in FLIP-47 they state that
>> rescaling is "Supported but not in all cases" for checkpoints. What I'm
>> hoping to find is guidance or documentation on when rescaling is supported
>> for checkpoints, and, more importantly, if the cases where it's not
>> supported will result in hard or silent failures.
>>
>> The context here is that we rely on the exactly-once semantics for our
>> Flink jobs in some important systems. In some cases when a job is in a bad
>> state it may not be able to take a checkpoint, but changing the job's
>> parallelism may resolve the issue. Therefore it's important for us to know
>> if deploying from a checkpoint, on purpose or by operator error, will break
>> the semantic guarantees of our job.
>>
>> Hard failure in the cases where you cannot change parallelism would be
>> the desired outcome imo.
>>
>> Thank you!
>>
>> [0]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-47%3A+Checkpoints+vs.+Savepoints
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Generally speaking changes of parallelism is supported between
>>> checkpoints and savepoints. Other changes to the job’s topology, like
>>> adding/changing/removing operators, changing types in the job graph are
>>> only officially supported via savepoints.
>>>
>>> But in reality, as for now, there is no difference between checkpoints
>>> and savepoints, but that’s subject to change, so it’s better not to relay
>>> this behaviour. For example with unaligned checkpoints [1] (hopefully in
>>> 1.11), there will be a difference between those two concepts.
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>>> 
>>>
>>> On 12 Mar 2020, at 12:16, Aaron Levin  wrote:
>>>
>>> Hi,
>>>
>>> What's the expected behaviour of:
>>>
>>> * changing an operator's parallelism
>>> * deploying this change from an incremental (RocksDB) checkpoint instead
>>> of a savepoint
>>>
>>> The flink docs[0][1] are a little unclear on what the expected behaviour
>>> is here. I understand that the key-space is being changed because
>>> parallelism is changed. I've seen instances where this happens and a job
>>> does not fail. But how does it treat potentially missing state for a given
>>> key?
>>>
>>> I know I can test this, but I'm curious what the _expected_ behaviour
>>> is? I.e. what behaviour can I rely on, which won't change between versions
>>> or releases? Do we expect the job to fail? Do we expect missing keys to
>>> just be considered empty?
>>>
>>> Thanks!
>>>
>>> [0]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
>>> [1]
>>> 

Re: Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-17 Thread Till Rohrmann
Thanks for reporting this issue Brian. I'm not a Table API expert but I
know that there is some work on the type system ongoing. I've pulled Timo
and Jingsong into the conversation who might be able to tell you what
exactly changed and whether the timestamp issue might be caused by the
changes.

Cheers,
Till

On Mon, Mar 16, 2020 at 5:48 AM  wrote:

> Hi community,
>
>
>
> Pravega connector is a connector that provides both Batch and Streaming
> Table API implementation. We uses descriptor API to build Table source.
> When we plan to upgrade to Flink 1.10, we found the unit tests are not
> passing with our existing Batch Table API. There is a type conversion error
> in the Timestamp with our descriptor Table API. The detail is in the issue
> here: https://github.com/pravega/flink-connectors/issues/341 Hope someone
> from Flink community can help us with some suggestions on this issue.
> Thanks.
>
>
>
> Best Regards,
>
> Brian
>
>
>


Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-17 Thread Till Rohrmann
@Becket do we already have a JIRA ticket to track this effort?

Cheers,
Till

On Mon, Mar 16, 2020 at 4:07 AM Becket Qin  wrote:

> Hi Sidney,
>
> The WARN logging you saw was from the AbstractPartitionDiscoverer which is
> created by FlinkKafkaConsumer itself. It has an internal consumer which
> shares the client.id of the actual consumer fetching data. This is a bug
> that we should fix.
>
> As Rong said, this won't affect the normal operation of the consumer. It
> is just an AppInfo MBean for reporting some information. There might be
> some slight impact on the accuracy of the consumer metrics, but should be
> almost ignorable because the partition discoverer is quite inactive
> compared with the actual consumer.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong  wrote:
>
>> We also had seen this issue before running Flink apps in a shared cluster
>> environment.
>>
>> Basically, Kafka is trying to register a JMX MBean[1] for application
>> monitoring.
>> This is only a WARN suggesting that you are registering more than one
>> MBean with the same client id "consumer-1", it should not affect your
>> normal application behavior.
>>
>> This is most likely occurring if you have more than one Kafka consumer
>> within the same JVM, are you using a session cluster[2]? can you share more
>> on your application configuration including parallelism and slot configs?
>> Also based on the log, you are not configuring the "client.id"
>> correctly. which config key are you using? could you also share your fill
>> Kafka properties map?
>>
>>
>> --
>> Rong
>>
>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>
>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner 
>> wrote:
>>
>>> Hey,
>>> I've been using Flink for a while now without any problems when running
>>> apps with a FlinkKafkaConsumer.
>>> All my apps have the same overall logic (consume from kafka -> transform
>>> event -> write to file) and the only way they differ from each other is the
>>> topic they read (remaining kafka config remains identical) and the way they
>>> transform the event.
>>> But suddenly, I've been starting to get the following error:
>>>
>>>
>>> 2020-03-15 12:13:56,911 WARN
>>>  org.apache.kafka.common.utils.AppInfoParser   - Error
>>> registering AppInfo mbean
>>> javax.management.InstanceAlreadyExistsException:
>>> kafka.consumer:type=app-info,id=consumer-1
>>>at
>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>
>>>at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>
>>>at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>>
>>>at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>
>>>at
>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>>
>>>at
>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>>
>>>at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
>>>
>>>at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
>>>
>>>at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
>>>
>>>at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>
>>>at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>
>>>at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>>
>>>at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>
>>>at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>
>>>at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>>
>>>at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>>
>>>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> I've tried setting the "client.id" on my consumer to a random UUID,
>>> making sure I don't have any duplicates but that didn't help either.
>>> Any idea what could be causing this?
>>>
>>> Thanks 
>>>
>>> *Sidney Feiner* 

Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-17 Thread jinhai wang
Hi LakeShen

You also must assign IDs to all operators of an application. Otherwise, you may 
not be able to recover from checkpoint

Doc: 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#matching-operator-state
 


Best Regards

jinhai...@gmail.com

> 2020年3月17日 下午5:51,Till Rohrmann  写道:
> 
> Let us know if you should run into any problems. The state processor API is 
> still young and could benefit from as much feedback as possible.
> 
> Cheers,
> Till
> 
> On Tue, Mar 17, 2020 at 2:57 AM LakeShen  > wrote:
> Wow,this feature is look so good,I will see this feature.
> Thank you to reply me , Till.
> 
> Best Regards,
> LakeShen
> 
> Till Rohrmann mailto:trohrm...@apache.org>> 
> 于2020年3月17日周二 上午4:17写道:
> If you want to change the max parallelism then you need to take a savepoint 
> and use Flink's state processor API [1] to rewrite the max parallelism by 
> creating a new savepoint from the old one.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>  
> 
> 
> Cheers,
> Till
> 
> On Sat, Mar 14, 2020 at 4:07 AM LakeShen  > wrote:
> Hi Eleanore , if you resume from savepoint , you can't change the flink
> operator's max parallelism .
> 
> Eleanore Jin mailto:eleanore@gmail.com>> 
> 于2020年3月14日周六 上午12:51写道:
> 
> > Hi Piotr,
> > Does this also apply to savepoint? (meaning the max parallelism should not
> > change for job resume from savepoint?)
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Fri, Mar 13, 2020 at 6:33 AM Piotr Nowojski  > >
> > wrote:
> >
> > > Hi,
> > >
> > > Yes, you can change the parallelism. One thing that you can not change is
> > > “max parallelism”.
> > >
> > > Piotrek
> > >
> > > > On 13 Mar 2020, at 04:34, Sivaprasanna  > > > >
> > > wrote:
> > > >
> > > > I think you can modify the operator’s parallelism. It is only if you
> > > have set maxParallelism, and while restoring from a checkpoint, you
> > > shouldn’t modify the maxParallelism. Otherwise, I believe the state will
> > be
> > > lost.
> > > >
> > > > -
> > > > Sivaprasanna
> > > >
> > > > On Fri, 13 Mar 2020 at 9:01 AM, LakeShen  > > > 
> > > >> 
> > > wrote:
> > > > Hi community,
> > > >   I have a question is that I cancel the flink task and retain the
> > > checkpoint dir, then restore from the checkpoint dir ,can I change the
> > > flink operator's parallelism,in my thoughts, I think I can't change the
> > > flink operator's parallelism,but I am not sure.
> > > >  Thanks to your reply.
> > > >
> > > > Best wishes,
> > > > LakeShen
> > >
> > >
> >



Re: Flink YARN app terminated before the client receives the result

2020-03-17 Thread Till Rohrmann
 @Tison could you create an issue to track the problem. Please also link
the uploaded log file for further debugging.

I think the reason why it worked in Flink 1.9 could have been that we had a
async callback in the longer chain which broke the flow of execution and
allowed to send the response. This is no longer the case. As an easy fix
one could change thenAccept into thenAcceptAsync in the
MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
think about allowing not only StatusHandler to close asynchronously. At the
moment we say that all other handler shut down immediately (see
AbstractHandler#closeHandlerAsync). But the problem with this change would
be that all handler would become stateful because they would need to
remember whether a request is currently ongoing or not.

Cheers,
Till

On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike  wrote:

> Hi Tison & Till and all,
>
> I have uploaded the client, taskmanager and jobmanager log to Gist (
> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
> can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
> YARN.
>
> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
> job with a savepoint *sometimes throws exceptions to the client side due to
> early shutdown of the server, even though the savepoint was successfully
> completed by reviewing the log, however when using the newly introduced
> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy
> now.
>
> Best,
> Weike
>
> On Tue, Mar 17, 2020 at 10:17 AM tison  wrote:
>
> > edit: previously after the cancellation we have a longer call chain to
> > #jobReachedGloballyTerminalState which does the archive job & JM graceful
> > showdown, which might take some time so that ...
> >
> > Best,
> > tison.
> >
> >
> > tison  于2020年3月17日周二 上午10:13写道:
> >
> >> Hi Weike & Till,
> >>
> >> I agree with Till and it is also the analysis from my side. However, it
> >> seems even if we don't have FLINK-15116, it is still possible that we
> >> complete the cancel future but the cluster got shutdown before it
> properly
> >> delivered the response.
> >>
> >> There is one thing strange that this behavior almost reproducible, it
> >> should be a possible order but not always. Maybe previous we have to
> >> firstly cancel the job which has a long call chain so that it happens we
> >> have enough time to delivered the response.
> >>
> >> But the resolution looks like we introduce some
> >> synchronization/finalization logics that clear these outstanding future
> >> with best effort before the cluster(RestServer) down.
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Till Rohrmann  于2020年3月17日周二 上午4:12写道:
> >>
> >>> Hi Weike,
> >>>
> >>> could you share the complete logs with us? Attachments are being
> >>> filtered out by the Apache mail server but it works if you upload the
> logs
> >>> somewhere (e.g. https://gist.github.com/) and then share the link with
> >>> us. Ideally you run the cluster with DEBUG log settings.
> >>>
> >>> I assume that you are running Flink 1.10, right?
> >>>
> >>> My suspicion is that this behaviour has been introduced with
> FLINK-15116
> >>> [1]. It looks as if we complete the shutdown future in
> >>> MiniDispatcher#cancelJob before we return the response to the
> >>> RestClusterClient. My guess is that this triggers the shutdown of the
> >>> RestServer which then is not able to serve the response to the client.
> I'm
> >>> pulling in Aljoscha and Tison who introduced this change. They might be
> >>> able to verify my theory and propose a solution for it.
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike 
> >>> wrote:
> >>>
>  Hi Yangze and all,
> 
>  I have tried numerous times, and this behavior persists.
> 
>  Below is the tail log of taskmanager.log:
> 
>  2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>   org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free
> slot
>  TaskSlot(index:0, state:ACTIVE, resource profile:
>  ResourceProfile{cpuCores=1., taskHeapMemory=1.503gb
>  (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>  (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>  allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>  d0a674795be98bd2574d9ea3286801cb).
>  2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>   org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>  d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>  2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>   org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
> JobManager
>  connection for job d0a674795be98bd2574d9ea3286801cb.
>  2020-03-13 12:06:14.250 

Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-17 Thread Till Rohrmann
Let us know if you should run into any problems. The state processor API is
still young and could benefit from as much feedback as possible.

Cheers,
Till

On Tue, Mar 17, 2020 at 2:57 AM LakeShen  wrote:

> Wow,this feature is look so good,I will see this feature.
> Thank you to reply me , Till.
>
> Best Regards,
> LakeShen
>
> Till Rohrmann  于2020年3月17日周二 上午4:17写道:
>
>> If you want to change the max parallelism then you need to take a
>> savepoint and use Flink's state processor API [1] to rewrite the max
>> parallelism by creating a new savepoint from the old one.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Cheers,
>> Till
>>
>> On Sat, Mar 14, 2020 at 4:07 AM LakeShen 
>> wrote:
>>
>>> Hi Eleanore , if you resume from savepoint , you can't change the flink
>>> operator's max parallelism .
>>>
>>> Eleanore Jin  于2020年3月14日周六 上午12:51写道:
>>>
>>> > Hi Piotr,
>>> > Does this also apply to savepoint? (meaning the max parallelism should
>>> not
>>> > change for job resume from savepoint?)
>>> >
>>> > Thanks a lot!
>>> > Eleanore
>>> >
>>> > On Fri, Mar 13, 2020 at 6:33 AM Piotr Nowojski 
>>> > wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > Yes, you can change the parallelism. One thing that you can not
>>> change is
>>> > > “max parallelism”.
>>> > >
>>> > > Piotrek
>>> > >
>>> > > > On 13 Mar 2020, at 04:34, Sivaprasanna 
>>> > > wrote:
>>> > > >
>>> > > > I think you can modify the operator’s parallelism. It is only if
>>> you
>>> > > have set maxParallelism, and while restoring from a checkpoint, you
>>> > > shouldn’t modify the maxParallelism. Otherwise, I believe the state
>>> will
>>> > be
>>> > > lost.
>>> > > >
>>> > > > -
>>> > > > Sivaprasanna
>>> > > >
>>> > > > On Fri, 13 Mar 2020 at 9:01 AM, LakeShen <
>>> shenleifight...@gmail.com
>>> > > > wrote:
>>> > > > Hi community,
>>> > > >   I have a question is that I cancel the flink task and retain
>>> the
>>> > > checkpoint dir, then restore from the checkpoint dir ,can I change
>>> the
>>> > > flink operator's parallelism,in my thoughts, I think I can't change
>>> the
>>> > > flink operator's parallelism,but I am not sure.
>>> > > >  Thanks to your reply.
>>> > > >
>>> > > > Best wishes,
>>> > > > LakeShen
>>> > >
>>> > >
>>> >
>>>
>>


Re: Very large _metadata file

2020-03-17 Thread Till Rohrmann
Did I understand you correctly that you use the union state to synchronize
the per partition state across all operators in order to obtain a global
overview? If this is the case, then this will only work in case of a
failover. Only then, all operators are being restarted with the union of
all operators state. If the job would never fail, then there would never be
an exchange of state.

If you really need a global view over your data, then you need to create an
operator with a parallelism of 1 which records all the different
timestamps.

Another idea could be to use the broadcast state pattern [1]. You could
have an operator which extracts the java.time.Instant and outputs them as a
side output and simply forwards the records on the main output. Then you
could use the side output as the broadcast input and the main output as the
normal input into the broadcast operator. The problem with this approach
might be that you don't get order guarantees between the side and the main
output.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Cheers,
Till

On Tue, Mar 17, 2020 at 2:29 AM Jacob Sevart  wrote:

> Thanks! That would do it. I've disabled the operator for now.
>
> The purpose was to know the age of the job's state, so that we could
> consider its output in terms of how much context it knows. Regular state
> seemed insufficient because partitions might see their first traffic at
> different times.
>
> How would you go about implementing something like that?
>
> On Mon, Mar 16, 2020 at 1:54 PM Till Rohrmann 
> wrote:
>
>> Hi Jacob,
>>
>> I think you are running into some deficiencies of Flink's union state
>> here. The problem is that for every entry in your list state, Flink stores
>> a separate offset (a long value). The reason for this behaviour is that we
>> use the same state implementation for the union state as well as for the
>> split state. For the latter, the offset information is required to split
>> the state in case of changing the parallelism of your job.
>>
>> My recommendation would be to try to get rid of union state all together.
>> The union state has primarily been introduced to checkpoint some source
>> implementations and might become deprecated due to performance problems
>> once these sources can be checkpointed differently.
>>
>> Cheers,
>> Till
>>
>> On Sat, Mar 14, 2020 at 3:23 AM Jacob Sevart  wrote:
>>
>>> Oh, I should clarify that's 43MB per partition, so with 48 partitions it
>>> explains my 2GB.
>>>
>>> On Fri, Mar 13, 2020 at 7:21 PM Jacob Sevart  wrote:
>>>
 Running *Checkpoints.loadCheckpointMetadata *under a debugger, I found
 something:
 *subtaskState.managedOperatorState[0].sateNameToPartitionOffsets("startup-times").offsets.value
  *weights
 43MB (5.3 million longs).

 "startup-times" is an operator state of mine (union list of
 java.time.Instant). I see a way to end up fewer items in the list, but I'm
 not sure how the actual size is related to the number of offsets. Can you
 elaborate on that?

 Incidentally, 42.5MB is the number I got out of
 https://issues.apache.org/jira/browse/FLINK-14618
 .
 So I think my two problems are closely related.

 Jacob

 On Mon, Mar 9, 2020 at 6:36 AM Congxian Qiu 
 wrote:

> Hi
>
> As Gordon said, the metadata will contain the ByteStreamStateHandle,
> when writing out the ByteStreamStateHandle, will write out the handle name
> -- which is a path(as you saw). The ByteStreamStateHandle will be created
> when state size is small than `state.backend.fs.memory-threshold`(default
> is 1024).
>
> If you want to verify this, you can ref the unit test
> `CheckpointMetadataLoadingTest#testLoadAndValidateSavepoint` and load the
> metadata, you can find out that there are many `ByteStreamStateHandle`, 
> and
> their names are the strings you saw in the metadata.
>
> Best,
> Congxian
>
>
> Jacob Sevart  于2020年3月6日周五 上午3:57写道:
>
>> Thanks, I will monitor that thread.
>>
>> I'm having a hard time following the serialization code, but if you
>> know anything about the layout, tell me if this makes sense. What I see 
>> in
>> the hex editor is, first, many HDFS paths. Then gigabytes of unreadable
>> data. Then finally another HDFS path at the end.
>>
>> If it is putting state in there, under normal circumstances, does it
>> make sense that it would be interleaved with metadata? I would expect all
>> the metadata to come first, and then state.
>>
>> Jacob
>>
>>
>>
>> Jacob
>>
>> On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas 
>> 

Re: How do I get the outPoolUsage value inside my own stream operator?

2020-03-17 Thread Felipe Gutierrez
Hi,

just for the record. I am collecting the values of outPoolUsage using
the piece of code below inside my stream operator

OperatorMetricGroup operatorMetricGroup = (OperatorMetricGroup)
this.getMetricGroup();
TaskMetricGroup taskMetricGroup = operatorMetricGroup.parent();
MetricGroup metricGroup = taskMetricGroup.getGroup("buffers");
Gauge gauge = (Gauge) metricGroup.getMetric("outPoolUsage");
if (gauge != null && gauge.getValue() != null) {
   float outPoolUsage = gauge.getValue().floatValue();
   this.outPoolUsageHistogram.update((long) (outPoolUsage * 100));
}


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Mon, Mar 16, 2020 at 5:17 PM Felipe Gutierrez
 wrote:
>
> Hi community,
>
> I have built my own operator (not a UDF) and I want to collect the
> metrics of "outPoolUsage" inside it. How do I do it assuming that I
> have to do some modifications in the source code?
>
> I know that the Gouge comes from
> flink-runtime/org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.java.
> Inside of my operator MyAbstractUdfStreamOperator I can get the
> "MetricGroup metricGroup = this.getMetricGroup()".
> Then I implemented the "Gauge gauge = (Gauge)
> metricGroup.getMetric("outPoolUsage");" but it returns null all the
> time. Even when I click on the Backpressure UI Interface.
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com


Re: Communication between two queries

2020-03-17 Thread Mikael Gordani
Hi Piotr!

Continuing with my scenario, since both of the queries will share the same
sink, I've realized that some issues will appear when I switch queries.
Especially with regards to stateful operators, e.g aggregation.

Let me provide an example:
So, let say that both of the queries ingest a sequence of integers, and it
will perform the average of these integers over some time.
E.g say that *query1* ingest the sequence *1,2,3,4 *
The windows for *query1* will be *[1,2,3] [2,3,4] [3,4]*.

If I'm later on "activating" *query2*, I need to have both of the queries
allowing tuples for a while, in order to allow the aggregation to finish in
*query1* before denying it input.
But, there is a possibility that *query2* might receive the tuples *3,4*,
which will result in the window: *[3][3,4][3,4]*
Later on, the output of the respective queries will be:
Query 1: 3, *4.5*, 3.5
Query2 : 3, *3.5*, 3.5

As one can see, the second output will be different.
I'm thinking of using watermarks somehow to make sure that both queries has
processed the same amount of data before writing to the sink, but I'm a bit
unsure on how to do it.
Do you have any suggestions or thoughts?
Cheers,

Den mån 16 mars 2020 kl 08:43 skrev Piotr Nowojski :

> Hi,
>
> Let us know if something doesn’t work :)
>
> Piotrek
>
> On 16 Mar 2020, at 08:42, Mikael Gordani  wrote:
>
> Hi,
> I'll try it out =)
>
> Cheers!
>
> Den mån 16 mars 2020 kl 08:32 skrev Piotr Nowojski :
>
>> Hi,
>>
>> In that case you could try to implement your `FilterFunction` as two
>> input operator, with broadcast control input, that would be setting the
>> `global_var`. Broadcast control input can be originating from some source,
>> or from some operator.
>>
>> Piotrek
>>
>> On 13 Mar 2020, at 15:47, Mikael Gordani  wrote:
>>
>> Hi Piotr!
>> Thanks for your response, I'll try to explain what I'm trying to achieve
>> in more detail:
>>
>> Essentially, If I've two queries, in which has the same operators and
>> runs in the same task, I would want to figure out some way of controlling
>> the ingestion from *a source* to the respective queries in such a way
>> that only one of the queries receive data, based on a condition.
>> For more context, the second query (query2), is equipped with
>> instrumented operators, which are standard operators extended with some
>> extra functionality, in my case, they enrich the tuples with meta-data.
>>
>> Source --> *Filter1* ---> rest of query1
>>|
>>v
>>*Filter2* ---> rest of query2
>>
>> By using *filters* prior to the queries, they allow records to pass
>> depending on a condition*, *let's say a global boolean variable (which
>> is initially set to false).
>> If it's set to *true, Filter1 will accept every record and Filter2 will
>> disregard every record.*
>> If it's set to
>> *false, Filter2 will accept every record and Filter1 will disregard every
>> record.*
>>
>> *So the filter operators looks something like this: *
>>
>> boolean global_var = false;
>>
>> private static class filter1 implements FilterFunction {
>> @Override
>> public boolean filter(Tuple t) throws Exception {
>> return !global_var;
>> }
>> }
>>
>> private static class filter2 implements FilterFunction {
>> @Override
>> public boolean filter(Tuple t) throws Exception {
>> return global_var;
>> }
>> }
>>
>>
>> Then later on, in the respective queries, there are some processing logic
>> in which changes the value of the global variable, thus enabling and
>> disabling the flow of data from the source to the respective queries.
>> The problem lies in this global variable being problematic in distributed
>> deployments, in which I'm having a hard time figuring out how to solve.
>> Is it a bit more clear? =)
>>
>>
>>
>
> --
> Med Vänliga Hälsningar,
> Mikael Gordani
>
>
>

-- 
Med Vänliga Hälsningar,
Mikael Gordani


Re:Re: 使用Flink sql insert 数据 to hive 之乱码问题

2020-03-17 Thread 吕先生
Hi,


在Hive 和 Flink 中执行 select * from temp_h1 均正常。在Flink sql中,查询异常:(将temp_h2 在hdfs 
上的文件,又以附件的形式上传了一次)











在 2020-03-17 17:05:21,"Jingsong Li"  写道:
>Hi,
>
>- SinkConversionToRow是Flink内部的数据结构转化结果,和结果正确性应该无关,具体看sink的。
>- 似乎只有log一个附件,没看到乱码文件。
>- 在Flink中试下“select * from temp_h1”看下结果?
>- 在Hive中试下“select * from temp_h1”看下结果?
>- 在Hive中试下“select * from temp_h2”看下结果?
>
>Best,
>Jingsong Lee
>
>On Tue, Mar 17, 2020 at 4:25 PM 吕先生  wrote:
>
>> 各位大佬,大家好!
>>
>> 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive
>> 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。
>>
>> 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink
>> SQL gateway 0.1
>>
>> 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL
>> gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本:
>>
>> //hive 中
>> CREATE TABLE IF NOT EXISTS temp_h1(
>>   id VARCHAR(50),
>>   lac VARCHAR(50),
>>   ci VARCHAR(50),
>>   flux_m VARCHAR(50),
>>   nums VARCHAR(50),
>>   sno  VARCHAR(50),
>>   cdate VARCHAR(50)
>> )
>> row format delimited FIELDS TERMINATED BY ','
>> stored as textfile
>> LOCATION '/tmp/hive/temp_h1';
>>
>> CREATE TABLE IF NOT EXISTS temp_h2(
>>   id VARCHAR(50),
>>   lac VARCHAR(50),
>>   ci VARCHAR(50),
>>   flux_m VARCHAR(50),
>>   nums VARCHAR(50),
>>   sno  VARCHAR(50),
>>   cdate VARCHAR(50)
>> )
>> row format delimited FIELDS TERMINATED BY ','
>> stored as textfile
>> LOCATION '/tmp/hive/temp_h2';
>>
>> //测试数据(t.txt)
>> 101,中国,100.02,123.001,12,30,20200316
>> 102,美国,100.02,123.001,12,30,20200316
>> 103,武汉,100.02,123.001,12,30,20200316
>> 104,北京,100.02,123.001,12,30,20200316
>> 105,俄罗斯,100.02,123.001,12,30,20200316
>> 106,海南,100.02,123.001,12,30,20200316
>> 107,香格里拉酒店,100.02,123.001,12,30,20200316
>>
>> //加载数据
>> load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1;
>>
>> //在FLink sql 中
>> insert into temp_h2 select * from temp_h1;
>> select * from temp_h2;//出现乱码,而且数据不全
>>
>> temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0
>> Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log
>>
>> 注,flink监控中显示信息:CsvTableSource(read fields: a, b) ->
>> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read
>> fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed
>> 。我有不清楚为啥使用的是SinkConversionToRow。
>>
>> 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了!
>>
>>
>>
>>
>>
>>
>>
>
>
>-- 
>Best, Jingsong Lee


Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-17 Thread Yang Wang
Hi Andrey,

Thanks for your explanation.

> About the logging
What i mean is we could not forward the stdout/stderr to local files and
docker stdout
at the same time by using log4j. For the jobmanager.log/taskmanager.log, it
works
quite well since we only need to add a console appender in
the log4j.properties.

I am just curious how to forward the stdout/stderr to local files and
docker stdout
at the same time by using log4j :)


Best,
Yang

Andrey Zagrebin  于2020年3月16日周一 下午4:58写道:

> Thanks for the further feedback Thomas and Yangze.
>
> > A generic, dynamic configuration mechanism based on environment
> variables is essential and it is already supported via envsubst and an
> environment variable that can supply a configuration fragment
>
> True, we already have this. As I understand this was introduced for
> flexibility to template a custom flink-conf.yaml with env vars, put it into
> the FLINK_PROPERTIES and merge it with the default one.
> Could we achieve the same with the dynamic properties (-Drpc.port=1234),
> passed as image args to run it, instead of FLINK_PROPERTIES?
> They could be also parametrised with env vars. This would require
> jobmanager.sh to properly propagate them to
> the StandaloneSessionClusterEntrypoint though:
> https://github.com/docker-flink/docker-flink/pull/82#issuecomment-525285552
> cc @Till
> This would provide a unified configuration approach.
>
> > On the flip side, attempting to support a fixed subset of configuration
> options is brittle and will probably lead to compatibility issues down the
> road
>
> I agree with it. The idea was to have just some shortcut scripted
> functions to set options in flink-conf.yaml for a custom Dockerfile or
> entry point script.
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS could be set as a dynamic property of
> started JM.
> I am not sure how many users depend on it. Maybe we could remove it.
> It also looks we already have somewhat unclean state in
> the docker-entrypoint.sh where some ports are set the hardcoded values
> and then FLINK_PROPERTIES are applied potentially duplicating options in
> the result flink-conf.yaml.
>
> I can see some potential usage of env vars as standard entry point args
> but for purposes related to something which cannot be achieved by passing
> entry point args, like changing flink-conf.yaml options. Nothing comes into
> my mind at the moment. It could be some setting specific to the running
> mode of the entry point. The mode itself can stay the first arg of the
> entry point.
>
> > I would second that it is desirable to support Java 11
>
> > Regarding supporting JAVA 11:
> > - Not sure if it is necessary to ship JAVA. Maybe we could just change
> > the base image from openjdk:8-jre to openjdk:11-jre in template docker
> > file[1]. Correct me if I understand incorrectly. Also, I agree to move
> > this out of the scope of this FLIP if it indeed takes much extra
> > effort.
>
> This is what I meant by bumping up the Java version in the docker hub
> Flink image:
> FROM openjdk:8-jre -> FROM openjdk:11-jre
> This can be polled dependently in user mailing list.
>
> > and in general use a base image that allows the (straightforward) use of
> more recent versions of other software (Python etc.)
>
> This can be polled whether to always include some version of python into
> the docker hub image.
> A potential problem here is once it is there, it is some hassle to
> remove/change it in a custom extended Dockerfile.
>
> It would be also nice to avoid maintaining images for various combinations
> of installed Java/Scala/Python in docker hub.
>
> > Regarding building from local dist:
> > - Yes, I bring this up mostly for development purpose. Since k8s is
> > popular, I believe more and more developers would like to test their
> > work on k8s cluster. I'm not sure should all developers write a custom
> > docker file themselves in this scenario. Thus, I still prefer to
> > provide a script for devs.
> > - I agree to keep the scope of this FLIP mostly for those normal
> > users. But as far as I can see, supporting building from local dist
> > would not take much extra effort.
> > - The maven docker plugin sounds good. I'll take a look at it.
>
> I would see any scripts introduced in this FLIP also as potential building
> blocks for a custom dev Dockerfile.
> Maybe, this will be all what we need for dev images or we write a dev
> Dockerfile, highly parametrised for building a dev image.
> If scripts stay in apache/flink-docker, it is also somewhat inconvenient
> to use them in the main Flink repo but possible.
> If we move them to apache/flink then we will have to e.g. include them
> into the release to make them easily available in apache/flink-docker and
> maintain them in main repo, although they are only docker specific.
> All in all, I would say, once we implement them, we can revisit this topic.
>
> Best,
> Andrey
>
> On Wed, Mar 11, 2020 at 8:58 AM Yangze Guo  wrote:
>
>> Thanks for the reply, Andrey.
>>
>> Regarding 

Re: 使用Flink sql insert 数据 to hive 之乱码问题

2020-03-17 Thread Jingsong Li
Hi,

- SinkConversionToRow是Flink内部的数据结构转化结果,和结果正确性应该无关,具体看sink的。
- 似乎只有log一个附件,没看到乱码文件。
- 在Flink中试下“select * from temp_h1”看下结果?
- 在Hive中试下“select * from temp_h1”看下结果?
- 在Hive中试下“select * from temp_h2”看下结果?

Best,
Jingsong Lee

On Tue, Mar 17, 2020 at 4:25 PM 吕先生  wrote:

> 各位大佬,大家好!
>
> 帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive
> 中读数据,然后insert 回hive 的表,然后select 看数据时,出现乱码。
>
> 软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink
> SQL gateway 0.1
>
> 切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL
> gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本:
>
> //hive 中
> CREATE TABLE IF NOT EXISTS temp_h1(
>   id VARCHAR(50),
>   lac VARCHAR(50),
>   ci VARCHAR(50),
>   flux_m VARCHAR(50),
>   nums VARCHAR(50),
>   sno  VARCHAR(50),
>   cdate VARCHAR(50)
> )
> row format delimited FIELDS TERMINATED BY ','
> stored as textfile
> LOCATION '/tmp/hive/temp_h1';
>
> CREATE TABLE IF NOT EXISTS temp_h2(
>   id VARCHAR(50),
>   lac VARCHAR(50),
>   ci VARCHAR(50),
>   flux_m VARCHAR(50),
>   nums VARCHAR(50),
>   sno  VARCHAR(50),
>   cdate VARCHAR(50)
> )
> row format delimited FIELDS TERMINATED BY ','
> stored as textfile
> LOCATION '/tmp/hive/temp_h2';
>
> //测试数据(t.txt)
> 101,中国,100.02,123.001,12,30,20200316
> 102,美国,100.02,123.001,12,30,20200316
> 103,武汉,100.02,123.001,12,30,20200316
> 104,北京,100.02,123.001,12,30,20200316
> 105,俄罗斯,100.02,123.001,12,30,20200316
> 106,海南,100.02,123.001,12,30,20200316
> 107,香格里拉酒店,100.02,123.001,12,30,20200316
>
> //加载数据
> load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1;
>
> //在FLink sql 中
> insert into temp_h2 select * from temp_h1;
> select * from temp_h2;//出现乱码,而且数据不全
>
> temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0
> Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log
>
> 注,flink监控中显示信息:CsvTableSource(read fields: a, b) ->
> SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read
> fields: a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed
> 。我有不清楚为啥使用的是SinkConversionToRow。
>
> 大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了!
>
>
>
>
>
>
>


-- 
Best, Jingsong Lee


使用Flink sql insert 数据 to hive 之乱码问题

2020-03-17 Thread 吕先生
各位大佬,大家好!
帮看一下这个问题:我使用flink sql 基于Hive 进行批计算(目的是替换spark sql 的批计算),具体是从hive 中读数据,然后insert 
回hive 的表,然后select 看数据时,出现乱码。
软件版本:hadoop2.9.1和hadoop2.8.5、hive-2.3.3和hive-2.3.4、flink1.10.0、zeppelin0.9.0、Flink
 SQL gateway 0.1


切换了多个hadoop、hive版本(各版本软件均来自官方下载),以及测试了Flink Sql Cli、Zeppelin、Flink SQL 
gateway等Flink sql运行环境,均没解决问题。Flink 是Run on Yarn的,下面是测试使用的sql 脚本:


//hive 中
CREATE TABLE IF NOT EXISTS temp_h1(
  id VARCHAR(50),
  lac VARCHAR(50),
  ci VARCHAR(50),
  flux_m VARCHAR(50),
  nums VARCHAR(50),
  sno  VARCHAR(50),
  cdate VARCHAR(50)
)
row format delimited FIELDS TERMINATED BY ',' 
stored as textfile 
LOCATION '/tmp/hive/temp_h1';


CREATE TABLE IF NOT EXISTS temp_h2(
  id VARCHAR(50),
  lac VARCHAR(50),
  ci VARCHAR(50),
  flux_m VARCHAR(50),
  nums VARCHAR(50),
  sno  VARCHAR(50),
  cdate VARCHAR(50)
)
row format delimited FIELDS TERMINATED BY ',' 
stored as textfile 
LOCATION '/tmp/hive/temp_h2';


//测试数据(t.txt)
101,中国,100.02,123.001,12,30,20200316
102,美国,100.02,123.001,12,30,20200316
103,武汉,100.02,123.001,12,30,20200316
104,北京,100.02,123.001,12,30,20200316
105,俄罗斯,100.02,123.001,12,30,20200316
106,海南,100.02,123.001,12,30,20200316
107,香格里拉酒店,100.02,123.001,12,30,20200316


//加载数据
load data local inpath '/home/hadoop/temp/t.txt' into table temp_h1;


//在FLink sql 中
insert into temp_h2 select * from temp_h1;
select * from temp_h2;//出现乱码,而且数据不全


temp_h2 在hdfs 上的乱码文件 见附件:cp-0-task-0-file-0
Flink sql 运行期间没有报错,yarn上运行的日志见附件:Executor.log


注,flink监控中显示信息:CsvTableSource(read fields: a, b) -> 
SourceConversion(table=[hive.test.temp_1, source: [CsvTableSource(read fields: 
a, b)]], fields=[a, b]) -> SinkConversionToRow -> Sink: Unnamed
。我有不清楚为啥使用的是SinkConversionToRow。


大家可以用我上面的代码也测试一下,帮验证一下,在你们的环境中,是否出现乱码问题,谢谢了!



2020-03-17 16:14:23,830 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- 

2020-03-17 16:14:23,832 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Starting YARN TaskExecutor runner (Version: 1.10.0, Rev:aa4eb8f, 
Date:07.02.2020 @ 19:18:19 CET)
2020-03-17 16:14:23,832 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  OS current user: hadoop
2020-03-17 16:14:24,358 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Current Hadoop/Kerberos user: hadoop
2020-03-17 16:14:24,358 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.8/25.152-b16
2020-03-17 16:14:24,358 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Maximum heap size: 491 MiBytes
2020-03-17 16:14:24,359 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  JAVA_HOME: /opt/beh/core/jdk
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Hadoop version: 2.7.5
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  JVM Options:
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Xmx536870902
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Xms536870902
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -XX:MaxDirectMemorySize=268435458
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -XX:MaxMetaspaceSize=100663296
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- 
-Dlog.file=/opt/beh/logs/yarn/userlogs/application_1581043496207_0054/container_e06_1581043496207_0054_01_06/taskmanager.log
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -Dlog4j.configuration=file:./log4j.properties
2020-03-17 16:14:24,360 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
-  Program Arguments:
2020-03-17 16:14:24,361 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -D
2020-03-17 16:14:24,361 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- taskmanager.memory.framework.off-heap.size=134217728b
2020-03-17 16:14:24,361 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -D
2020-03-17 16:14:24,361 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- taskmanager.memory.network.max=134217730b
2020-03-17 16:14:24,361 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -D
2020-03-17 16:14:24,361 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- taskmanager.memory.network.min=134217730b
2020-03-17 16:14:24,361 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
- -D
2020-03-17 16:14:24,361 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner  
 

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread Andrey Zagrebin
Hi Lake,

When the Flink doc mentions a state entry in RocksDB, we mean one key/value
pair stored by user code over any keyed state API
(keyed context in keyed operators obtained e.g. from keyBy()
transformation).
In case of map or list, the doc means map key/value and list element.

- value/aggregating/folding/reducing state: key -> value
- map state: key -> map key -> value
- list state: key -> list -> element in some position

Best,
Andrey

On Tue, Mar 17, 2020 at 11:04 AM Yun Tang  wrote:

> Hi Lake
>
> Flink leverage RocksDB's background compaction mechanism to filter
> out-of-TTL entries (by comparing with current timestamp provided from
> RocksDB's time_provider) to not let them stay in newly compacted data.
>
> This would iterator over data entries with FlinkCompactionFilter::FilterV2
> [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the
> threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB
> iterator more than several entries .e.g 1000, it would call time_provider
> to update current timestamp to let the process of cleaning up more eagerly
> and accurately.
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
> [2]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140
>
> Best
> Yun Tang
>
> --
> *From:* LakeShen 
> *Sent:* Tuesday, March 17, 2020 15:30
> *To:* dev ; user-zh ;
> user 
> *Subject:* Question about RocksDBStateBackend Compaction Filter state
> cleanup
>
> Hi community ,
>
> I see the flink RocksDBStateBackend state cleanup,now the code like this :
>
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.seconds(1))
> .cleanupInRocksdbCompactFilter(1000)
> .build();
>
>
>
> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.
>
>
> What's the meaning of  1000 entries? 1000 different key ?
>
> Thanks to your reply.
>
> Best regards,
> LakeShen
>


Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread Andrey Zagrebin
Hi Lake,

When the Flink doc mentions a state entry in RocksDB, we mean one key/value
pair stored by user code over any keyed state API
(keyed context in keyed operators obtained e.g. from keyBy()
transformation).
In case of map or list, the doc means map key/value and list element.

- value/aggregating/folding/reducing state: key -> value
- map state: key -> map key -> value
- list state: key -> list -> element in some position

Best,
Andrey

On Tue, Mar 17, 2020 at 11:04 AM Yun Tang  wrote:

> Hi Lake
>
> Flink leverage RocksDB's background compaction mechanism to filter
> out-of-TTL entries (by comparing with current timestamp provided from
> RocksDB's time_provider) to not let them stay in newly compacted data.
>
> This would iterator over data entries with FlinkCompactionFilter::FilterV2
> [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the
> threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB
> iterator more than several entries .e.g 1000, it would call time_provider
> to update current timestamp to let the process of cleaning up more eagerly
> and accurately.
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
> [2]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140
>
> Best
> Yun Tang
>
> --
> *From:* LakeShen 
> *Sent:* Tuesday, March 17, 2020 15:30
> *To:* dev ; user-zh ;
> user 
> *Subject:* Question about RocksDBStateBackend Compaction Filter state
> cleanup
>
> Hi community ,
>
> I see the flink RocksDBStateBackend state cleanup,now the code like this :
>
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.seconds(1))
> .cleanupInRocksdbCompactFilter(1000)
> .build();
>
>
>
> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.
>
>
> What's the meaning of  1000 entries? 1000 different key ?
>
> Thanks to your reply.
>
> Best regards,
> LakeShen
>


Re: 读取ORC文件的VectorizedRowBatch的最佳batchSize设置建议

2020-03-17 Thread Jingsong Li
Hi,

1.10没有convert成Row,只是提供一个row的view;之前是convert成Row,这个差别对性能影响很大。

Best,
Jingsong Lee

On Tue, Mar 17, 2020 at 3:31 PM jun su  wrote:

> hi Jingsong Li,
>
> 感谢回复,理解了你的意思.
> 这个问题是我在看flink-1.10有关orc的代码时发现的 , 我注意到flink-1.10的release notes中有提到:
> 向量化读取ORC. 但是我对比老版本的代码, 一直是采用VectorizedRowBatch的方式,
> flink-1.10只是对不同版本的hive做了适配, 我也看到有关代码也是你的pull request, 不知道是否是这样?
>
> Jingsong Li  于2020年3月17日周二 下午12:04写道:
>
> > Hi,
> >
> > 1万行太大了,会占用太大内存。而且batchSize太大也不利于cache。
> > batchSize不一定要和row group一样,这种row group特别大的情况下,batchSize 够用就行了。
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Mar 17, 2020 at 11:52 AM jun su  wrote:
> >
> > > hi all:
> > >  在向量化读取orc文件时, 需要配置VectorizedRowBatch的batchSize, 用于设置每次读取的行数,
> > > 我知道根据orc索引, 读取orc文件最小的单位应该是row group(默认1w行), 底层会根据filter条件来精确到哪些row
> > group,
> > > 那之前提到的batchSize设置为1000时 , 那一个row group需要读取10次, 每个row group又是按列存储,
> > > 势必会存在非连续读取的可能, 这样岂不是做不到最大优化? 是够将batchSize设置和row group配置一样才能读取效率最大化呢?
> > > 不知道我的理解是否正确.
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best,
> Jun Su
>


-- 
Best, Jingsong Lee


Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread Yun Tang
Hi Lake

Flink leverage RocksDB's background compaction mechanism to filter out-of-TTL 
entries (by comparing with current timestamp provided from RocksDB's 
time_provider) to not let them stay in newly compacted data.

This would iterator over data entries with FlinkCompactionFilter::FilterV2 [1], 
and the parameter 'queryTimeAfterNumEntries' in Flink indicates the threshold 
'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB iterator more 
than several entries .e.g 1000, it would call time_provider to update current 
timestamp to let the process of cleaning up more eagerly and accurately.

[1] 
https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
[2] 
https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140

Best
Yun Tang


From: LakeShen 
Sent: Tuesday, March 17, 2020 15:30
To: dev ; user-zh ; user 

Subject: Question about RocksDBStateBackend Compaction Filter state cleanup

Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :


StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();


The default background cleanup for RocksDB backend queries the current 
timestamp each time 1000 entries have been processed.

What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen


Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread Yun Tang
Hi Lake

Flink leverage RocksDB's background compaction mechanism to filter out-of-TTL 
entries (by comparing with current timestamp provided from RocksDB's 
time_provider) to not let them stay in newly compacted data.

This would iterator over data entries with FlinkCompactionFilter::FilterV2 [1], 
and the parameter 'queryTimeAfterNumEntries' in Flink indicates the threshold 
'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB iterator more 
than several entries .e.g 1000, it would call time_provider to update current 
timestamp to let the process of cleaning up more eagerly and accurately.

[1] 
https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
[2] 
https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140

Best
Yun Tang


From: LakeShen 
Sent: Tuesday, March 17, 2020 15:30
To: dev ; user-zh ; user 

Subject: Question about RocksDBStateBackend Compaction Filter state cleanup

Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :


StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();


The default background cleanup for RocksDB backend queries the current 
timestamp each time 1000 entries have been processed.

What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen


Re: Flink YARN app terminated before the client receives the result

2020-03-17 Thread DONG, Weike
Hi Tison & Till and all,

I have uploaded the client, taskmanager and jobmanager log to Gist (
https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
YARN.

Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
job with a savepoint *sometimes throws exceptions to the client side due to
early shutdown of the server, even though the savepoint was successfully
completed by reviewing the log, however when using the newly introduced
*stop* API, that bug disappeared, however, *cancel* API seems to be buggy
now.

Best,
Weike

On Tue, Mar 17, 2020 at 10:17 AM tison  wrote:

> edit: previously after the cancellation we have a longer call chain to
> #jobReachedGloballyTerminalState which does the archive job & JM graceful
> showdown, which might take some time so that ...
>
> Best,
> tison.
>
>
> tison  于2020年3月17日周二 上午10:13写道:
>
>> Hi Weike & Till,
>>
>> I agree with Till and it is also the analysis from my side. However, it
>> seems even if we don't have FLINK-15116, it is still possible that we
>> complete the cancel future but the cluster got shutdown before it properly
>> delivered the response.
>>
>> There is one thing strange that this behavior almost reproducible, it
>> should be a possible order but not always. Maybe previous we have to
>> firstly cancel the job which has a long call chain so that it happens we
>> have enough time to delivered the response.
>>
>> But the resolution looks like we introduce some
>> synchronization/finalization logics that clear these outstanding future
>> with best effort before the cluster(RestServer) down.
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann  于2020年3月17日周二 上午4:12写道:
>>
>>> Hi Weike,
>>>
>>> could you share the complete logs with us? Attachments are being
>>> filtered out by the Apache mail server but it works if you upload the logs
>>> somewhere (e.g. https://gist.github.com/) and then share the link with
>>> us. Ideally you run the cluster with DEBUG log settings.
>>>
>>> I assume that you are running Flink 1.10, right?
>>>
>>> My suspicion is that this behaviour has been introduced with FLINK-15116
>>> [1]. It looks as if we complete the shutdown future in
>>> MiniDispatcher#cancelJob before we return the response to the
>>> RestClusterClient. My guess is that this triggers the shutdown of the
>>> RestServer which then is not able to serve the response to the client. I'm
>>> pulling in Aljoscha and Tison who introduced this change. They might be
>>> able to verify my theory and propose a solution for it.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike 
>>> wrote:
>>>
 Hi Yangze and all,

 I have tried numerous times, and this behavior persists.

 Below is the tail log of taskmanager.log:

 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
 TaskSlot(index:0, state:ACTIVE, resource profile:
 ResourceProfile{cpuCores=1., taskHeapMemory=1.503gb
 (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
 (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
 allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
 d0a674795be98bd2574d9ea3286801cb).
 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
 d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
 connection for job d0a674795be98bd2574d9ea3286801cb.
 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
 connection for job d0a674795be98bd2574d9ea3286801cb.
 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
 to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
 SIGTERM. Shutting down as requested.
 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
 SIGTERM. Shutting down as requested.
 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
 cache
 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
 hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
 FileChannelManager removed spill file directory

Re: 读取ORC文件的VectorizedRowBatch的最佳batchSize设置建议

2020-03-17 Thread jun su
hi Jingsong Li,

感谢回复,理解了你的意思.
这个问题是我在看flink-1.10有关orc的代码时发现的 , 我注意到flink-1.10的release notes中有提到:
向量化读取ORC. 但是我对比老版本的代码, 一直是采用VectorizedRowBatch的方式,
flink-1.10只是对不同版本的hive做了适配, 我也看到有关代码也是你的pull request, 不知道是否是这样?

Jingsong Li  于2020年3月17日周二 下午12:04写道:

> Hi,
>
> 1万行太大了,会占用太大内存。而且batchSize太大也不利于cache。
> batchSize不一定要和row group一样,这种row group特别大的情况下,batchSize 够用就行了。
>
> Best,
> Jingsong Lee
>
> On Tue, Mar 17, 2020 at 11:52 AM jun su  wrote:
>
> > hi all:
> >  在向量化读取orc文件时, 需要配置VectorizedRowBatch的batchSize, 用于设置每次读取的行数,
> > 我知道根据orc索引, 读取orc文件最小的单位应该是row group(默认1w行), 底层会根据filter条件来精确到哪些row
> group,
> > 那之前提到的batchSize设置为1000时 , 那一个row group需要读取10次, 每个row group又是按列存储,
> > 势必会存在非连续读取的可能, 这样岂不是做不到最大优化? 是够将batchSize设置和row group配置一样才能读取效率最大化呢?
> > 不知道我的理解是否正确.
> >
>
>
> --
> Best, Jingsong Lee
>


-- 
Best,
Jun Su


Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread LakeShen
Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();



> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.


What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen


Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread LakeShen
Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();



> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.


What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen