flink执行任务失败,taskmanager内存不释放

2022-01-20 Thread Liu Join
环境:flink1.13.5,Standalone模式,taskmanager内存4GB,两个slot

任务数据量很少,不到10MB,任务执行时,taskmanager内存就一直上涨知道报错重启,但因为任务重启后taskmanager内存没有释放,导致任务彻底失败,taskmanager服务也挂了

从 Windows 版邮件发送



Re:回复:FlinkKafkaProducer 问题

2022-01-20 Thread 潘明文















HI ,
   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector flink-connector-kafka_2.11

在 2022-01-21 14:36:06,"selves_nan"  写道:
>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>
>
>| |
>selves_nan
>|
>|
>selves_...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2022年01月21日 13:00,潘明文 写道:
>HI,
>"生产者的事务id"  怎么获取呀?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-01-21 10:41:37,"selves_nan"  写道:
>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>//开启幂等性
>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>
>
>| |
>selves_nan
>|
>|
>selves_...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2022年01月20日 14:39,潘明文 写道:
>hi,
>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>
>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>
>
>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
>producer attempted to use a producer id which is not currently assigned to its 
>transactional id.
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>at 
>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>at 
>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>at java.lang.Thread.run(Thread.java:748)
>Suppressed: java.lang.NullPointerException
>


回复:FlinkKafkaProducer 问题

2022-01-20 Thread selves_nan
Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api


| |
selves_nan
|
|
selves_...@163.com
|
签名由网易邮箱大师定制


在2022年01月21日 13:00,潘明文 写道:
HI,
"生产者的事务id"  怎么获取呀?

















在 2022-01-21 10:41:37,"selves_nan"  写道:
Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
//开启幂等性
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);


| |
selves_nan
|
|
selves_...@163.com
|
签名由网易邮箱大师定制


在2022年01月20日 14:39,潘明文 写道:
hi,
我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。

FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);


org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.NullPointerException



Re: Question - Filesystem connector for lookup table

2022-01-20 Thread Martijn Visser
Hi Jason,

The best option would indeed be to make the dimension data available in
something like a database which you can access via JDBC, HBase or Hive.
Those do support lookups.

Best regards,

Martijn

On Thu, 20 Jan 2022 at 22:11, Jason Yi <93t...@gmail.com> wrote:

> Thanks for the quick response.
>
> Is there any best or suggested practice for the use case of when we have
> data sets in a filesystem that we want to use in Flink as reference data
> (like dimension data)?
>
>- Would making dimension data a Hive table or loading it into a table
>in RDBMS (like MySQL) be the best option for the use case?
>- Or should we consider having a stage area where output of Flink
>would be stored, and then consider having another application (like Spark)
>to join Flink's output to dimension data?
>
> Jason.
>
> On Thu, Jan 20, 2022 at 12:23 PM Martijn Visser 
> wrote:
>
>> Hi Jason,
>>
>> It's not (properly) supported and we should update the documentation.
>>
>> There is no out of the box possibility to use a file from filesystem as a
>> lookup table as far as I know.
>>
>> Best regards,
>>
>> Martijn
>>
>> Op do 20 jan. 2022 om 18:44 schreef Jason Yi <93t...@gmail.com>
>>
>>> Hello,
>>>
>>> I have data sets in s3 and want to use them as lookup tables in Flink. I
>>> defined tables with the filesystem connector and joined the tables to a
>>> table, defined with the Kinesis connector, in my Flink application. I
>>> expected its output to be written to s3, but no data was written to a sink
>>> table.
>>>
>>> According to the Flink doc (
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors),
>>> filesystem is available for a lookup source. I wonder if this is true.
>>>
>>> If the filesystem connector is not available for lookup tables, is there
>>> any alternative way to use data from s3 as a lookup table in Flink?
>>>
>>> Flink version: 1.14.0 (on EMR 6.5)
>>> Kinesis source table: a watermark was defined.
>>> Lookup data: CSV data in s3.
>>> Sink table: Hudi connector
>>>
>>> Please let me know if I'm missing anything.
>>>
>>> Thanks in advance.
>>> Jason.
>>>
>> --
>>
>> Martijn Visser | Product Manager
>>
>> mart...@ververica.com
>>
>> 
>>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>>


回复: flink任务提交到集群执行一段时间报错Java heap space

2022-01-20 Thread Liu Join

我已经将5s的时间窗口替换为100条的countWindowAll,具体实现为使用aggregate函数将窗口内的数据拼接为一条sql语句,sql语句如下:replace
 into table (a1,a2,a3,a4,..) values(…)
但还是没有解决,
heap dump暂时无法提供,
taskmanager内存分配如下:
task heap:2.76G,network:343MB,JVMMetaspace:256MB

我一共运行了两个任务,都会出现这种问题,但之前写过一个简单的数据同步的程序没有出错,就是将一个MySQL库中的500张表同步到另一个MySQL库,不知道对于这种问题有没有解决的方向。
之前在监控任务运行时发现是MySQLsource先失败,然后导致整个任务挂了,在开启checkpoint时,MySQLsource和开窗之前的部分为一个parallelism,这个parallelism的checkpoint大小一直是136MB,从任务开始到结束都是136MB,其他运算的checkpoint不到1MB,是否有这部分原因
从 Windows 版邮件发送

发件人: Caizhi Weng
发送时间: 2022年1月21日 10:52
收件人: flink中文邮件组
主题: Re: flink任务提交到集群执行一段时间报错Java heap space

Hi!

5s 的窗口拼接 sql 语句看起来比较可疑,具体是怎么实现的?另外可以把 task manager 的 heap dump
出来看一下哪里占比较多的堆内存。

Liu Join  于2022年1月20日周四 13:28写道:

> 环境:
>
> flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。
>
>
> 任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。
>
> 报错内容:
> java.lang.OutOfMemoryError: Java heap space
>
> 报错表象:
>
> 整个taskmanager内存被占满,任务失败重启后taskmanager内存仍然是满的,导致任务再次失败。之后任务直接挂了。时间长了之后内存没释放,Taskmanager进程也会挂了。
> 从 Windows 版邮件发送
>
>



Re:回复:FlinkKafkaProducer 问题

2022-01-20 Thread 潘明文
HI,
"生产者的事务id"  怎么获取呀?

















在 2022-01-21 10:41:37,"selves_nan"  写道:
>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>//开启幂等性
>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>
>
>| |
>selves_nan
>|
>|
>selves_...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2022年01月20日 14:39,潘明文 写道:
>hi,
>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>
>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>
>
>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
>producer attempted to use a producer id which is not currently assigned to its 
>transactional id.
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>at 
>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>at 
>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>at java.lang.Thread.run(Thread.java:748)
>Suppressed: java.lang.NullPointerException
>


Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and falls back to default /opt/flink/conf/log4j-console.properties

2022-01-20 Thread Yang Wang
Changing the order of exec command makes sense to me. Would you please
create a ticket for this?

The /opt/flink/conf is cleaned up because we are mounting the conf files
from K8s ConfigMap.



Best,
Yang

Tamir Sagi  于2022年1月18日周二 17:48写道:

> Hey Yang,
>
> Thank you for confirming it.
>
> IMO, a better approach is to change the order "log_setting" , "ARGS" and 
> "FLINK_ENV_JAVA_OPTS"
> in exec command.
> In that way we prioritize user defined properties.
>
> From:
>
> exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
> -classpath "`manglePathList
> "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN}
> "${ARGS[@]}"
>
> To
>
> exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList
> "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN}
> "${ARGS[@]}" "${FLINK_ENV_JAVA_OPTS}"
>
> Unless there are system configurations which not supposed to be overridden
> by user(And then having dedicated env variables is better). does it make
> sense to you?
>
>
> In addition, any idea why /opt/flink/conf gets cleaned (Only
> flink-conf.xml is there).
>
>
> Best,
> Tamir
>
>
> --
> *From:* Yang Wang 
> *Sent:* Tuesday, January 18, 2022 6:02 AM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored
> and falls back to default /opt/flink/conf/log4j-console.properties
>
>
> *EXTERNAL EMAIL*
>
>
> I think you are right. Before 1.13.0, if the log configuration file does
> not exist, the logging properties would not be added to the start command.
> That is why it could work in 1.12.2.
>
> However, from 1.13.0, we are not using
> "kubernetes.container-start-command-template" to generate the JM/TM start
> command, but the jobmanager.sh/taskmanager.sh. We do not
> have the same logic in the "flink-console.sh".
>
> Maybe we could introduce an environment for log configuration file name in
> the "flink-console.sh". The default value could be
> "log4j-console.properties" and it could be configured by users.
> If this makes sense to you, could you please create a ticket?
>
>
> Best,
> Yang
>
> Tamir Sagi  于2022年1月17日周一 22:53写道:
>
> Hey Yang,
>
> thanks for answering,
>
> TL;DR
>
> Assuming I have not missed anything , the way TM and JM are created is
> different between these 2 versions,
> but it does look like flink-console.sh gets called eventually with the
> same exec command.
>
> in 1.12.2 if org.apache.flink.kubernetes.kubeclient.parameters#hasLog4j
> returns false then logging args are not added to startCommand.
>
>
>1. why does the config dir gets cleaned once the cluster starts? Even
>when I pushed log4j-console.properties to the expected location
>(/opt/flink/conf) , the directory includes only flink-conf.yaml.
>2. I think by running exec command "...${FLINK_ENV_JAVA_OPTS}
>"${log_setting[@]}" "${ARGS[@]}" some properties might be ignored.
>IMO, it should first look for properties in java.opts provided by the
>user in flink-conf and falls back to default in case it's not present.
>
>
> Taking about Native kubernetes mode
>
> I checked the bash script in flink-dist module, it looks like in both
> 1.14.2 and 1.12.2. flink-console.sh is similar. (in 1.14.2 there are more
> cases for the input argument)
>
> logging variable is the same
>
> https://github.com/apache/flink/blob/release-1.14.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L101
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L89
>
> Exec command is the same
>
> https://github.com/apache/flink/blob/release-1.14.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L114
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L99
>
> As for creating TM/JM, in *1.14.2* there is a usage of 2 bash scripts
>
>- kubernetes-jobmanager.sh
>- kubernetes-taskmanager.sh
>
> They get called while decorating the pod, referenced in startCommand.
>
> for instance, JobManager.
>
> https://github.com/apache/flink/blob/release-1.14.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecorator.java#L58-L59
>
> kubernetes-jobmanager.sh gets called once the container starts which calls
> flink-console.sh internally and pass the
> deploymentName(kubernetes-application in our case) and args.
>
> In *1.12.2* the decorator set /docker-entrypoint.sh
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L67
>
> and set the start command
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java#L224
>
>
> 

Re: FileSource Usage

2022-01-20 Thread Guowei Ma
Hi, Meghajit

Thanks Meghajit for sharing your user case.
I found a workaround way that you could try to name your file in a
timestamp style. More details could be found here[1].
Another little concern is that Flink is a distributed system, which means
that we could not assume any order even if we list the file in the created
order.

[1]
https://stackoverflow.com/questions/49045725/gsutil-gcloud-storage-file-listing-sorted-date-descending
Best,
Guowei


On Thu, Jan 20, 2022 at 11:11 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hi Guowei,
>
> Thanks for your answer. Regarding your question,
> *> Currently there is no such public interface ,which you could extend to
> implement your own strategy. Would you like to share the specific problem
> you currently meet?*
>
> The GCS bucket that we are trying to read from is periodically populated
> with parquet files by another service. This can be daily or even hourly.
> For an already pre-populated bucket, we would like to read the files
> created from, say, day *T* till day *T+10*.  Order matters here and hence
> we would like to read the oldest files first, and then the new ones.  Would
> you know how I can enforce a reading order here ?
>
> Thanks,
> Meghajit
>
>
>
>
> On Thu, Jan 20, 2022 at 2:29 PM Guowei Ma  wrote:
>
>> Hi, Meghajit
>>
>> 1. From the implementation [1] the order of split depends on the
>> implementation of the FileSystem.
>>
>> 2. From the implementation [2] the order of the file also depends on the
>> implementation of the FileSystem.
>>
>> 3. Currently there is no such public interface ,which you could extend to
>> implement your own strategy. Would you like to share the specific problem
>> you currently meet?
>>
>> 3. `FileSource` supports checkpoints. I think the watermark is a general
>> mechanism so you could read the related documentation[3].
>>
>> [1]
>> https://github.com/apache/flink/blob/355b165859aebaae29b6425023d352246caa0613/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java#L141
>>
>> [2]
>> https://github.com/apache/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java#L102
>>
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/
>> Best,
>> Guowei
>>
>>
>> On Wed, Jan 19, 2022 at 6:06 PM Meghajit Mazumdar <
>> meghajit.mazum...@gojek.com> wrote:
>>
>>> Hello,
>>>
>>> We are using FileSource
>>> 
>>> to process Parquet Files and had a few doubts around it. Would really
>>> appreciate if somebody can help answer them:
>>>
>>> 1. For a given file, does FileSource read the contents inside it in
>>> order ? In other words, what is the order in which the file splits are
>>> generated from the contents of the file ?
>>>
>>> 2. We want to provide a GCS Bucket URL to the FileSource so that it can
>>> read parquet files from there. The bucket has multiple parquet files.
>>> Wanted to know, what is the order in which the files will be picked and
>>> processed by this FileSource ? Can we provide an order strategy ourselves,
>>> say, process according to creation time ?
>>>
>>> 3. Is it possible/good practice to apply checkpointing and watermarking
>>> for a bounded source like FileSource ?
>>>
>>> --
>>> *Regards,*
>>> *Meghajit*
>>>
>>
>
> --
> *Regards,*
> *Meghajit*
>


Re: flink任务提交到集群执行一段时间报错Java heap space

2022-01-20 Thread Caizhi Weng
Hi!

5s 的窗口拼接 sql 语句看起来比较可疑,具体是怎么实现的?另外可以把 task manager 的 heap dump
出来看一下哪里占比较多的堆内存。

Liu Join  于2022年1月20日周四 13:28写道:

> 环境:
>
> flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。
>
>
> 任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。
>
> 报错内容:
> java.lang.OutOfMemoryError: Java heap space
>
> 报错表象:
>
> 整个taskmanager内存被占满,任务失败重启后taskmanager内存仍然是满的,导致任务再次失败。之后任务直接挂了。时间长了之后内存没释放,Taskmanager进程也会挂了。
> 从 Windows 版邮件发送
>
>


回复:FlinkKafkaProducer 问题

2022-01-20 Thread selves_nan
Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
//开启幂等性
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);


| |
selves_nan
|
|
selves_...@163.com
|
签名由网易邮箱大师定制


在2022年01月20日 14:39,潘明文 写道:
hi,
我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。

FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);


org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.NullPointerException



Re: Tuning akka.ask.timeout

2022-01-20 Thread Guowei Ma
Hi, Paul

Would you like to share some information such as the Flink version you used
and the memory of TM and JM.
And when does the timeout happen? Such as at begin of the job or during the
running of the job

Best,
Guowei


On Thu, Jan 20, 2022 at 4:45 PM Paul Lam  wrote:

> Hi,
>
> I’m tuning a Flink job with 1000+ parallelism, which frequently fails with
> Akka TimeOutException (it was fine with 200 parallelism).
>
> I see some posts recommend increasing `akka.ask.timeout` to 120s. I’m not
> familiar with Akka but it looks like a very long time compared to the
> default 10s and as a response timeout.
>
> So I’m wondering what’s the reasonable range for this option? And why
> would the Actor fail to respond in time (the message was dropped due to
> pressure)?
>
> Any input would be appreciated! Thanks a lot.
>
> Best,
> Paul Lam
>
>


Re: Unhandled exception in flink 1.14.2

2022-01-20 Thread John Smith
As per another recent thread. This is still an issue.

On Wed, 19 Jan 2022 at 06:36, Chesnay Schepler  wrote:

> This is a serialization bug in Flink, see
> https://issues.apache.org/jira/browse/FLINK-24550.
> It will be fixed in the upcoming 1.14.3 release.
>
> On 19/01/2022 09:01, Caizhi Weng wrote:
>
> Hi!
>
> To print out gc logs of job manager you can add this configuration to
> flink-conf.yaml
>
> env.java.opts.jobmanager: -Xloggc:/tmp/jobmanager-gc.log
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps
>
> This will print gc logs to /tmp/jobmanager-gc.log.
>
> I'm not familiar with the garbage collection metrics page. If the unit of
> time is ms then gc does not seem to be heavy. However I would still
> recommend to print out gc logs for a double check.
>
> John Smith  于2022年1月19日周三 06:43写道:
>
>> I think I may know what is causing the issue... So I have 3 job managers.
>>
>> 1- I Navigated to a non leader UI and submitted a new job...
>> 2- The UI timed out with grey lines
>> 3- Some Internal Server error messages appeared.
>> 4- Going back to the leader UI checking the running jobs, the job seems
>> to have been submitted and running.
>> 5- Going back to the job manager UI that failed, now shows ok.
>>
>> And the logs are as follows... And below are the GC metrics from the UI.
>>
>> 2022-01-18 22:33:24,574 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 2297fdac52fa7191afee9ec4ff11c805 is submitted.
>> 2022-01-18 22:33:24,574 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=2297fdac52fa7191afee9ec4ff11c805.
>> 2022-01-18 22:34:00,618 ERROR org.apache.flink.runtime.rest.handler.job.
>> JobDetailsHandler [] - Unhandled exception.
>> java.util.concurrent.CancellationException: null
>> at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:
>> 2276) ~[?:1.8.0_312]
>> at org.apache.flink.runtime.rest.handler.legacy.
>> DefaultExecutionGraphCache.getExecutionGraphInternal(
>> DefaultExecutionGraphCache.java:98) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.legacy.
>> DefaultExecutionGraphCache.getExecutionGraphInfo(
>> DefaultExecutionGraphCache.java:67) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.job.
>> AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler
>> .java:81) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.AbstractRestHandler
>> .respondToRequest(AbstractRestHandler.java:83) ~[flink-dist_2.12-1.14.2
>> .jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(
>> AbstractHandler.java:195) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>> .lambda$channelRead0$0(LeaderRetrievalHandler.java:83) ~[flink-dist_2.12-
>> 1.14.2.jar:1.14.2]
>> at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
>> at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer
>> .java:45) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>> .channelRead0(LeaderRetrievalHandler.java:80) [flink-dist_2.12-1.14.2
>> .jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>> .channelRead0(LeaderRetrievalHandler.java:49) [flink-dist_2.12-1.14.2
>> .jar:1.14.2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:
>> 99) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:379) [flink-dist_2.12-1.14.2.jar:1.14.
>> 2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:365) [flink-dist_2.12-1.14.2.jar:1.14.
>> 2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:357) [flink-dist_2.12-1.14.2.jar:1.14.
>> 2]
>> at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(
>> RouterHandler.java:115) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.router.RouterHandler
>> .channelRead0(RouterHandler.java:94) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.runtime.rest.handler.router.RouterHandler
>> .channelRead0(RouterHandler.java:55) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:
>> 99) [flink-dist_2.12-1.14.2.jar:1.14.2]
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:379) [flink-dist_2.12-1.14.2.jar:1.14.
>> 2]
>> at 

Re: Flink 1.14.3: Can not access job information from a jobmanager UI

2022-01-20 Thread John Smith
I had the same issue in my thread it was mentioned that it was supposed to
be fixed in 1.14.3

On Thu, 20 Jan 2022 at 07:40, Martin  wrote:

> Thanks for the quick response, I assumed thats already known, but was not
> able to find the issue. Thanks :)
>
> Chesnay Schepler schrieb am 20.01.2022 13:36 (GMT +01:00):
>
> This is a bug in Flink for which I have filed a ticket:
> https://issues.apache.org/jira/browse/FLINK-25732
>
> As is you can only request the job overview from the leading jobmanager.
>
> On 20/01/2022 13:15, Martin wrote:
>
>
>
>


Re: Question - Filesystem connector for lookup table

2022-01-20 Thread Jason Yi
Thanks for the quick response.

Is there any best or suggested practice for the use case of when we have
data sets in a filesystem that we want to use in Flink as reference data
(like dimension data)?

   - Would making dimension data a Hive table or loading it into a table in
   RDBMS (like MySQL) be the best option for the use case?
   - Or should we consider having a stage area where output of Flink would
   be stored, and then consider having another application (like Spark) to
   join Flink's output to dimension data?

Jason.

On Thu, Jan 20, 2022 at 12:23 PM Martijn Visser 
wrote:

> Hi Jason,
>
> It's not (properly) supported and we should update the documentation.
>
> There is no out of the box possibility to use a file from filesystem as a
> lookup table as far as I know.
>
> Best regards,
>
> Martijn
>
> Op do 20 jan. 2022 om 18:44 schreef Jason Yi <93t...@gmail.com>
>
>> Hello,
>>
>> I have data sets in s3 and want to use them as lookup tables in Flink. I
>> defined tables with the filesystem connector and joined the tables to a
>> table, defined with the Kinesis connector, in my Flink application. I
>> expected its output to be written to s3, but no data was written to a sink
>> table.
>>
>> According to the Flink doc (
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors),
>> filesystem is available for a lookup source. I wonder if this is true.
>>
>> If the filesystem connector is not available for lookup tables, is there
>> any alternative way to use data from s3 as a lookup table in Flink?
>>
>> Flink version: 1.14.0 (on EMR 6.5)
>> Kinesis source table: a watermark was defined.
>> Lookup data: CSV data in s3.
>> Sink table: Hudi connector
>>
>> Please let me know if I'm missing anything.
>>
>> Thanks in advance.
>> Jason.
>>
> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>


Re: Flink native k8s integration vs. operator

2022-01-20 Thread Alexis Sarda-Espinosa
Hi Robert,

I agree with you, I mean that's why I was writing a K8s operator, but the 
restriction wasn't decided by me, it was imposed on me. I guess my thinking was 
rather that an operator wouldn't necessarily supersede standalone+reactive, at 
least not in my case, but that certainly doesn't mean an operator is a bad 
idea, it's just something that other users might want to keep in mind.

Regards,
Alexis.


From: Robert Metzger 
Sent: Thursday, January 20, 2022 7:06 PM
To: Alexis Sarda-Espinosa 
Cc: dev ; user 
Subject: Re: Flink native k8s integration vs. operator

Hi Alexis,

The usage of Custom Resource Definitions (CRDs). The main reason given to me 
was that such resources are global (for a given cluster) and that is not 
desired. I know that ultimately a CR based on a CRD can be scoped to a specific 
namespace, but customer is king…

I don't think this restriction applies to many organizations. K8s operators are 
the de facto standard for deploying all kinds of software. There are quite many 
projects that used to just have a Helm chart, that are now switching over to 
provide operators, because they provide a much better experience.
If you have more specifics on this concern that is relevant for the Flink 
community, I'd like to hear that.


Kubernetes Service Accounts (SAs) with roles to create deployments/pods. This 
one is more understandable, particularly after the whole log4j debacle. Roles 
that manage solely deployment.scale subresources would be acceptable though.

This requirement is not strictly needed to deploy Flink on K8s. Only with the 
native K8s integration of Flink, you need to give the Flink JVM a role that 
allows creating other pods.


Best,
Robert

On Tue, Jan 18, 2022 at 5:18 PM Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:

Hi everyone,



Since I see this is getting some traction, I’d like to add a couple things. I 
had been developing a Kubernetes controller for Flink as a Proof of Concept at 
my company; I called it Flork because it was to be a Flink Orchestrator for 
Kubernetes. In the end, we will most likely not use this controller due to 
security concerns that were communicated to me. These concerns stem from the 
fact that our product would be used by customers in their own Kubernetes 
clusters, and many customers don’t want:



- The usage of Custom Resource Definitions (CRDs). The main reason given to me 
was that such resources are global (for a given cluster) and that is not 
desired. I know that ultimately a CR based on a CRD can be scoped to a specific 
namespace, but customer is king…



- Kubernetes Service Accounts (SAs) with roles to create deployments/pods. This 
one is more understandable, particularly after the whole log4j debacle. Roles 
that manage solely deployment.scale subresources would be acceptable though.



I mention these in case they prove to be relevant for others in the current 
context. For us, it means we have to stick with something like standalone 
Kubernetes + reactive/adaptive.



Nevertheless, the PoC I had was already functional and, while I would have to 
request permission to contribute the code to the community, it might be useful 
for these efforts. However, I’d first ask if there is actually interest in this 
code, considering these are some of the “features” it currently has:



* The CRD relies on the Pod Template support included in Flink itself. As such, 
some of the fields in the CRD are “vanilla” pod specs, and the schema reflects 
that because it embeds a flattened version of the schema from [1]. I’d also 
have a basic Helm chart ready.



* The code is written in a mixture of Java and Kotlin, and is built with 
Gradle. I made heavy use of Kotlin Coroutines to implement some of the core 
logic in a non-blocking way.



* The code already supports High Availability by leveraging Kubernetes leases 
and the corresponding helpers in Fabric8’s client.



* The main deployment logic is delegated to Flink’s own flink-kubernetes module 
[2]. Nevertheless, my build shadows all the fabric8 classes and service 
definitions embedded in said module, so that the rest of the code can use other 
kubernetes-client versions independently.



* The controller handles savepoint creation for redeployments after CR changes, 
e.g. upgrades. This would also work after controller fail-over with/without HA.



* The code supports some extension for custom container images: classes defined 
in META-INF/services/ files are called as decorators for Flink’s conf file 
and/or the pod specs defined in the CR, and they could be copied to the image 
on top of an official base version.



* A deployment mode without CRD could be supported --- I have some code that 
can run on top of the core controller and allows “embedding” a CR in a Config 
Map key. The translation between the CM and the core controller code is then 
done transparently.



* I have a module that integrates the code with 

Re: Question - Filesystem connector for lookup table

2022-01-20 Thread Martijn Visser
Hi Jason,

It's not (properly) supported and we should update the documentation.

There is no out of the box possibility to use a file from filesystem as a
lookup table as far as I know.

Best regards,

Martijn

Op do 20 jan. 2022 om 18:44 schreef Jason Yi <93t...@gmail.com>

> Hello,
>
> I have data sets in s3 and want to use them as lookup tables in Flink. I
> defined tables with the filesystem connector and joined the tables to a
> table, defined with the Kinesis connector, in my Flink application. I
> expected its output to be written to s3, but no data was written to a sink
> table.
>
> According to the Flink doc (
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors),
> filesystem is available for a lookup source. I wonder if this is true.
>
> If the filesystem connector is not available for lookup tables, is there
> any alternative way to use data from s3 as a lookup table in Flink?
>
> Flink version: 1.14.0 (on EMR 6.5)
> Kinesis source table: a watermark was defined.
> Lookup data: CSV data in s3.
> Sink table: Hudi connector
>
> Please let me know if I'm missing anything.
>
> Thanks in advance.
> Jason.
>
-- 

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


Re: Issue with Flink UI for Flink 1.14.0

2022-01-20 Thread Peter Westermann
Just tried this again with Flink 1.14.3 since 
https://issues.apache.org/jira/browse/FLINK-24550 is listed as fixed. I am 
running into similar errors when calling the /v1/jobs/overview endpoint 
(without any running jobs):
{"errors":["Internal server error.",""]}



Peter Westermann
Team Lead – Realtime Analytics
[cidimage001.jpg@01D78D4C.C00AC080]
peter.westerm...@genesys.com
[cidimage001.jpg@01D78D4C.C00AC080]
[cidimage002.jpg@01D78D4C.C00AC080]


From: Dawid Wysakowicz 
Date: Thursday, October 14, 2021 at 10:00 AM
To: Peter Westermann , user@flink.apache.org 

Subject: Re: Issue with Flink UI for Flink 1.14.0

I am afraid it is a bug in flink 1.14. I created a ticket for it 
FLINK-24550[1]. I believe we should pick it up soonish. Thanks for reporting 
the issue!

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-24550
On 13/10/2021 20:32, Peter Westermann wrote:
Hello,

I just started testing Flink 1.14.0 and noticed some weird behavior. This is 
for a Flink cluster with zookeeper for HA and two job managers (one leader, one 
backup). The UI on the leader works fine. The UI on the other job manager does 
not load any job-specific data. Same applies to the REST interface. If I 
requests job data from /v1/jobs/{jobId}, I get the expected response on the 
leader but on the other job manager, I only get an exception stack trace:

{"errors":["Internal server error.",""]}


Peter Westermann
Team Lead – Realtime Analytics
[cidimage001.jpg@01D78D4C.C00AC080]
peter.westerm...@genesys.com
[cidimage001.jpg@01D78D4C.C00AC080]
[cidimage002.jpg@01D78D4C.C00AC080]



AIFlow flink java job plugin

2022-01-20 Thread deepthi Sridharan
Hello,

[Apologies if this group does not answer questions related to AIFlow
project and happy to learn if there are other email handles I need to send
my questions to]

I am new to AIFlow and exploring some demo projects for a simple workflow I
want to try with two flink jobs, a batch (bounded processor) and a stream
job (unbounded). The processors are written in java.

I wrote a simple workflow that uses ai_flow.action_on_job_status API to
chain the stream job to run after the batch job is finished. What I found
however was that the stream job gets submitted immediately after
successfully submitting the batch job and not after the batch job finishes.
>From a cursory look at the code in flink_job_plugin.py, it is not obvious
where and whether the job_id generated from the submission
gets
used to track the job status at all.

Core parts of my workflow:

af.init_ai_flow_context()
with af.job_config('flink_batch'):

af.user_define_operation(processor=FlinkJavaProcessor(entry_class="com.linkedin.flink.example.TableApiExample",

main_jar_file=jar_filename,

args=["true"]))
with af.job_config('flink_stream'):

af.user_define_operation(processor=FlinkJavaProcessor(entry_class="com.linkedin.flink.
example.TableApiExample",

main_jar_file=jar_filename,

args=["false"]))

af.action_on_job_status('flink_stream', 'flink_batch')


Is this the right way to go about working with the Flink Java processor? I
could not find much documentation on this and would appreciate any inputs
on the right APIs to use.


-- 
Regards,
Deepthi


Re: Flink native k8s integration vs. operator

2022-01-20 Thread Robert Metzger
Hi Alexis,

The usage of Custom Resource Definitions (CRDs). The main reason given to
> me was that such resources are global (for a given cluster) and that is not
> desired. I know that ultimately a CR based on a CRD can be scoped to a
> specific namespace, but customer is king…


I don't think this restriction applies to many organizations. K8s operators
are the de facto standard for deploying all kinds of software. There are
quite many projects that used to just have a Helm chart, that are now
switching over to provide operators, because they provide a much better
experience.
If you have more specifics on this concern that is relevant for the Flink
community, I'd like to hear that.


Kubernetes Service Accounts (SAs) with roles to create deployments/pods.
> This one is more understandable, particularly after the whole log4j
> debacle. Roles that manage solely deployment.scale subresources would be
> acceptable though.


This requirement is not strictly needed to deploy Flink on K8s. Only with
the native K8s integration of Flink, you need to give the Flink JVM a role
that allows creating other pods.


Best,
Robert

On Tue, Jan 18, 2022 at 5:18 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hi everyone,
>
>
>
> Since I see this is getting some traction, I’d like to add a couple
> things. I had been developing a Kubernetes controller for Flink as a Proof
> of Concept at my company; I called it Flork because it was to be a Flink
> Orchestrator for Kubernetes. In the end, we will most likely not use this
> controller due to security concerns that were communicated to me. These
> concerns stem from the fact that our product would be used by customers in
> their own Kubernetes clusters, and many customers don’t want:
>
>
>
> - The usage of Custom Resource Definitions (CRDs). The main reason given
> to me was that such resources are global (for a given cluster) and that is
> not desired. I know that ultimately a CR based on a CRD can be scoped to a
> specific namespace, but customer is king…
>
>
>
> - Kubernetes Service Accounts (SAs) with roles to create deployments/pods.
> This one is more understandable, particularly after the whole log4j
> debacle. Roles that manage solely deployment.scale subresources would be
> acceptable though.
>
>
>
> I mention these in case they prove to be relevant for others in the
> current context. For us, it means we have to stick with something like
> standalone Kubernetes + reactive/adaptive.
>
>
>
> Nevertheless, the PoC I had was already functional and, while I would have
> to request permission to contribute the code to the community, it might be
> useful for these efforts. However, I’d first ask if there is actually
> interest in this code, considering these are some of the “features” it
> currently has:
>
>
>
> * The CRD relies on the Pod Template support included in Flink itself. As
> such, some of the fields in the CRD are “vanilla” pod specs, and the schema
> reflects that because it embeds a flattened version of the schema from [1].
> I’d also have a basic Helm chart ready.
>
>
>
> * The code is written in a mixture of Java and Kotlin, and is built with
> Gradle. I made heavy use of Kotlin Coroutines to implement some of the core
> logic in a non-blocking way.
>
>
>
> * The code already supports High Availability by leveraging Kubernetes
> leases and the corresponding helpers in Fabric8’s client.
>
>
>
> * The main deployment logic is delegated to Flink’s own flink-kubernetes
> module [2]. Nevertheless, my build shadows all the fabric8 classes and
> service definitions embedded in said module, so that the rest of the code
> can use other kubernetes-client versions independently.
>
>
>
> * The controller handles savepoint creation for redeployments after CR
> changes, e.g. upgrades. This would also work after controller fail-over
> with/without HA.
>
>
>
> * The code supports some extension for custom container images: classes
> defined in META-INF/services/ files are called as decorators for Flink’s
> conf file and/or the pod specs defined in the CR, and they could be copied
> to the image on top of an official base version.
>
>
>
> * A deployment mode without CRD could be supported --- I have some code
> that can run on top of the core controller and allows “embedding” a CR in a
> Config Map key. The translation between the CM and the core controller code
> is then done transparently.
>
>
>
> * I have a module that integrates the code with Inversion of Control
> containers such as Spring. I only used javax annotations (soon to be
> jakarta), so it’s not tied to Spring.
>
>
>
> Something I haven’t considered at all in my code is ingress for Flink’s UI.
>
>
>
> Let me know what you think.
>
>
>
> [1]
> https://github.com/kubernetes/kubernetes/blob/master/api/openapi-spec/swagger.json
>
> [2] https://github.com/apache/flink/tree/master/flink-kubernetes
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Gyula Fóra 
> *Sent:* Montag, 17. Januar 2022 

Question - Filesystem connector for lookup table

2022-01-20 Thread Jason Yi
Hello,

I have data sets in s3 and want to use them as lookup tables in Flink. I
defined tables with the filesystem connector and joined the tables to a
table, defined with the Kinesis connector, in my Flink application. I
expected its output to be written to s3, but no data was written to a sink
table.

According to the Flink doc (
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#supported-connectors),
filesystem is available for a lookup source. I wonder if this is true.

If the filesystem connector is not available for lookup tables, is there
any alternative way to use data from s3 as a lookup table in Flink?

Flink version: 1.14.0 (on EMR 6.5)
Kinesis source table: a watermark was defined.
Lookup data: CSV data in s3.
Sink table: Hudi connector

Please let me know if I'm missing anything.

Thanks in advance.
Jason.


Re: Unaligned Tumbling Windows

2022-01-20 Thread Aeden Jameson
On Thu, Jan 20, 2022 at 2:46 AM yidan zhao  wrote:

> self-define the window assigners.
>

Thanks, I'll check that out. If you have links to especially good examples
and explanations, that would be great. Otherwise, I presume the Flink
codebase itself is the place to start.

-- 
Cheers,
Aeden


Statefun and the DataStream API

2022-01-20 Thread Barry Higgins
Hi,

We have been looking at using stateful functions to deploy a remote
python model as a stateful and interacting with it from Flink via
Kafka.

Everything has worked well until we ran into some in-house deployment
issues around the various environments.

This coupled with the use case (where we may be happy to put the state
load on the calling Flink app) means that we are looking again at
calling our model via the DataSteam API.

The documentation at
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.1/docs/sdk/flink-datastream/
does link to an example but that is a dead link and that I’m guessing
would point back to:
https://github.com/apache/flink-statefun/blob/release-2.2/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java

I am struggling to follow the creation of a valid
StatefulFunctionDataStreamBuilder in relation to a remote function
particularly this line:

.withFunctionProvider(GREET, unused -> new MyFunction())

where the calling code seems to have access to the remote function.

Would anyone be able to give me a steer please?

Thanks,

Barry


Re: Unaligned Tumbling Windows

2022-01-20 Thread Aeden Jameson
Hi Caizhi,
Thanks for responding.

>So you'd like to flatten the traffic by materializing the results of
different parallelisms at different times?
Yes.

> What's your use case for streaming windows?
In short, summarizing many-many-many millions of sessions every minute
involving mostly stateless, but a few stateful metrics. Meaning the metric
depends on some trait of the sessions that has been seen. However, state
management is fairly minimal.

> Could you please elaborate more on the reason you'd like to flatten the
traffic?
There's hints of performance issues and effects on the Kafka cluster, which
I anticipate only get worse as similar, but different apps are developed
for different parts of the org. The issues are mostly hidden by
over-provisioning at the moment.

> different times is the session window [1]. Does that meet your needs?
I played around with this a little. However it's not realistic to use this
window because the memory requirements would be enormous in our case. I did
an experiment with using the session window in combination with the
continuous and purging trigger. However that resulted in the session
timeout window not being evaluated because well the events had been purged.
And we do need that timeout as well. So in short I believe I'm looking for
a session-aware unaligned tumbling window.



On Sun, Jan 16, 2022 at 9:10 PM Caizhi Weng  wrote:

> Hi!
>
> So you'd like to flatten the traffic by materializing the results of
> different parallelisms at different times?
>
> As far as I know this is not possible. Could you please elaborate more on
> the reason you'd like to flatten the traffic? Is it causing any problems
> with your job? What's your use case for streaming windows?
>
> Also the only type of window capable of emitting results for different
> parallelisms at different times is the session window [1]. Does that meet
> your needs?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#session-windows
>
> Aeden Jameson  于2022年1月15日周六 02:16写道:
>
>> When using tumbling windows the windows materialize all at once which
>> results in burst-y traffic. How does one go about unaligned tumbling
>> windows? Does this require going down the road of custom window, assigner
>> and triggers?
>>
>> --
>> Cheers,
>> Aeden
>>
>>
>>

-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson
Blah Blah Blah: http://www.twitter.com/daliful


Re: FileSource Usage

2022-01-20 Thread Meghajit Mazumdar
Hi Guowei,

Thanks for your answer. Regarding your question,
*> Currently there is no such public interface ,which you could extend to
implement your own strategy. Would you like to share the specific problem
you currently meet?*

The GCS bucket that we are trying to read from is periodically populated
with parquet files by another service. This can be daily or even hourly.
For an already pre-populated bucket, we would like to read the files
created from, say, day *T* till day *T+10*.  Order matters here and hence
we would like to read the oldest files first, and then the new ones.  Would
you know how I can enforce a reading order here ?

Thanks,
Meghajit




On Thu, Jan 20, 2022 at 2:29 PM Guowei Ma  wrote:

> Hi, Meghajit
>
> 1. From the implementation [1] the order of split depends on the
> implementation of the FileSystem.
>
> 2. From the implementation [2] the order of the file also depends on the
> implementation of the FileSystem.
>
> 3. Currently there is no such public interface ,which you could extend to
> implement your own strategy. Would you like to share the specific problem
> you currently meet?
>
> 3. `FileSource` supports checkpoints. I think the watermark is a general
> mechanism so you could read the related documentation[3].
>
> [1]
> https://github.com/apache/flink/blob/355b165859aebaae29b6425023d352246caa0613/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java#L141
>
> [2]
> https://github.com/apache/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java#L102
>
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/
> Best,
> Guowei
>
>
> On Wed, Jan 19, 2022 at 6:06 PM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hello,
>>
>> We are using FileSource
>> 
>> to process Parquet Files and had a few doubts around it. Would really
>> appreciate if somebody can help answer them:
>>
>> 1. For a given file, does FileSource read the contents inside it in order
>> ? In other words, what is the order in which the file splits are generated
>> from the contents of the file ?
>>
>> 2. We want to provide a GCS Bucket URL to the FileSource so that it can
>> read parquet files from there. The bucket has multiple parquet files.
>> Wanted to know, what is the order in which the files will be picked and
>> processed by this FileSource ? Can we provide an order strategy ourselves,
>> say, process according to creation time ?
>>
>> 3. Is it possible/good practice to apply checkpointing and watermarking
>> for a bounded source like FileSource ?
>>
>> --
>> *Regards,*
>> *Meghajit*
>>
>

-- 
*Regards,*
*Meghajit*


Re: Flink 1.14.3: Can not access job information from a jobmanager UI

2022-01-20 Thread Martin
Thanks for the quick response, I assumed thats already known, but was not able to find the issue. Thanks :)

Chesnay Schepler schrieb am 20.01.2022 13:36 (GMT +01:00):

This is a bug in Flink for which I have filed a ticket: https://issues.apache.org/jira/browse/FLINK-25732
 
As is you can only request the job overview from the leading jobmanager.
 
On 20/01/2022 13:15, Martin wrote:
 



Re: Flink 1.14.3: Can not access job information from a jobmanager UI

2022-01-20 Thread Chesnay Schepler
This is a bug in Flink for which I have filed a ticket: 
https://issues.apache.org/jira/browse/FLINK-25732


As is you can only request the job overview from the leading jobmanager.

On 20/01/2022 13:15, Martin wrote:


Hey,

I upgraded today my Flink application to Flink 1.14.3.

I run it in a HA-Standalone-K8 deployment with 2 JobManagers, so one 
active and one on standby.
As its only a prototyp I make the UI, port 8081 of the JobManager 
pods, avaiable via NodePort.


Already with older Flink version I sometimes got some problems within 
the UI telling "Internal server error.".

With the new 1.14.3 its really often.

It seem, that depending on to which pod the traffic is load balanced, 
by the NodePort, the UI works or has the problems.
Right now, I dont know which has trouble, but due to the other issues 
about that, I assume its also the stand-by one.


Here is the exception from the JobManager log, fetched from the UI, 
from the time the errors happend (so from the erroneous JobManager).


2022-01-20 12:01:11,829 ERROR 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] - 
Unhandled exception.
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed 
to serialize the result for RPC call : requestMultipleJobDetails.
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417) 
~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373) 
~[?:?]

at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]

at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
at 
org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:858) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at 
org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.lambda$new$0(FutureUtils.java:876) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]

at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258) 
~[?:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]

at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) 
~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) 
~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) 
~[?:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source) ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown 
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]

at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) 
~[?:?]

at akka.dispatch.OnComplete.internal(Future.scala:300) ~[?:?]
at akka.dispatch.OnComplete.internal(Future.scala:297) ~[?:?]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) ~[?:?]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) ~[?:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) 
~[?:?]
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) 
~[flink-dist_2.12-1.14.3.jar:1.14.3]

at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) ~[?:?]
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) 
~[?:?]
at 

Flink 1.14.3: Can not access job information from a jobmanager UI

2022-01-20 Thread Martin
Hey,
I upgraded today my Flink application to Flink 1.14.3.
I run it in a HA-Standalone-K8 deployment with 2 JobManagers, so one active and one on standby.As its only a prototyp I make the UI, port 8081 of the JobManager pods, avaiable via NodePort.
Already with older Flink version I sometimes got some problems within the UI telling "Internal server error.".With the new 1.14.3 its really often.
It seem, that depending on to which pod the traffic is load balanced, by the NodePort, the UI works or has the problems.Right now, I dont know which has trouble, but due to the other issues about that, I assume its also the stand-by one.
Here is the exception from the JobManager log, fetched from the UI, from the time the errors happend (so from the erroneous JobManager).
2022-01-20 12:01:11,829 ERROR org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] - Unhandled exception.org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to serialize the result for RPC call : requestMultipleJobDetails.at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417) ~[?:?]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373) ~[?:?]at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]at org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:858) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.util.concurrent.FutureUtils$ResultConjunctFuture.lambda$new$0(FutureUtils.java:876) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258) ~[?:?]at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) ~[?:?]at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[?:?]at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) ~[?:?]at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) ~[?:?]at akka.dispatch.OnComplete.internal(Future.scala:300) ~[?:?]at akka.dispatch.OnComplete.internal(Future.scala:297) ~[?:?]at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) ~[?:?]at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) ~[?:?]at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) ~[?:?]at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) ~[?:?]at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) ~[?:?]at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) ~[?:?]at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at 

Re: build.gradle troubles with IntelliJ

2022-01-20 Thread HG
Thanks Nico.
I will let you know the results


Op do 20 jan. 2022 om 10:39 schreef Nico Kruber :

> Hi,
> unfortunately, the gradle example in the docs has grown a bit old [1] and
> I
> haven't gotten around to updating it yet. Nonetheless, we are using an
> updated
> version internally and so far this has been working fine. The latest
> project
> we've been using this at is available at:
> https://github.com/ververica/lab-flink-latency
> It should be rather simple to check this out and adapt to your needs.
>
> I'd love to get some feedback on it so that I can eventually get this into
> the
> Flink docs/quickstarts.
>
>
> Nico
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-24478
>
> On Wednesday, 19 January 2022 15:07:39 CET HG wrote:
> > Hi
> > For  my first project I followed :
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastre
> > am/project-configuration/ where there a build.gradle example
> >
> > I created a directory, entered into it and did gradle init.
> > The I created the build.gradle as per the example.
> >
> > I opened the directory with IntelliJ and opened the build.gradle file
> > When performing the "load changes" I bump into issues.
> >
> > The first issue was a minor one:
> > Build file '/u01/testje/build.gradle' line: 41
> >  A problem occurred evaluating root project 'testje'.
> >
> > > Cannot add task 'wrapper' as a task with that name already exists.
> > > Run with --scan to get full insights.
> >
> > Easy : I had to remove "task" and (type Wrapper).
> >
> > After that I have :
> >
> > Caused by:
> >
> org.gradle.internal.metaobject.AbstractDynamicObject$CustomMessageMissingMet
> > hodException: *Could not find method compile() for arguments
> > [org.apache.flink:flink-streaming-java_2.11:1.13-SNAPSHOT*] on object of
> > type
> >
> org.gradle.api.internal.artifacts.dsl.dependencies.DefaultDependencyHandler.
> >
> > I tried scala 2.11 and 1.14 SNAPSHOT  but nothing works.
> >
> > When I look at
> >
> https://repository.apache.org/content/repositories/snapshots/org/apache/flin
> > k/flink-streaming-java.. everything is available
> >
> > What am I doing wrong?
> >
> > Regards Hans-Peter
>
>
> --
> Dr. Nico Kruber | Solutions Architect
>
> Follow us @VervericaData Ververica
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
>
>


Help with completing Flink SQL please!

2022-01-20 Thread mark
Hello,
I'm writing to ask for help with generating completion hints for Flink SQL.
I'm trying to use the Calcite SqlAdisor with the Flink parser. My problem
is that I can get completion working for table names, but not column names.

  "select a.mgr from ^stuff a"

gives me good results:  CATALOG.SALES.EMP, CATALOG.SALES, etc, but

  "select a.^ from sales.emp a"

gives me only "*". See
https://github.com/mnuttall/sql-testing/blob/main/src/main/java/test/AdvisorBuilder.java
for
how I'm constructing my SqlAdvisor and
https://github.com/mnuttall/sql-testing/blob/main/src/test/java/test/TestCompletion.java
for
some simple test code.

Should it be possible to get completion for column names in my second
example? Can anyone please point me to a working example in test code, or
spot what I've done wrong in my test code? Very many thanks in advance for
any help anyone can offer.

Regards,

Mark


Re: Unaligned Tumbling Windows

2022-01-20 Thread yidan zhao
self-define the window assigners.

Caizhi Weng  于2022年1月17日周一 13:11写道:

> Hi!
>
> So you'd like to flatten the traffic by materializing the results of
> different parallelisms at different times?
>
> As far as I know this is not possible. Could you please elaborate more on
> the reason you'd like to flatten the traffic? Is it causing any problems
> with your job? What's your use case for streaming windows?
>
> Also the only type of window capable of emitting results for different
> parallelisms at different times is the session window [1]. Does that meet
> your needs?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#session-windows
>
> Aeden Jameson  于2022年1月15日周六 02:16写道:
>
>> When using tumbling windows the windows materialize all at once which
>> results in burst-y traffic. How does one go about unaligned tumbling
>> windows? Does this require going down the road of custom window, assigner
>> and triggers?
>>
>> --
>> Cheers,
>> Aeden
>>
>>
>>


Re: build.gradle troubles with IntelliJ

2022-01-20 Thread Nico Kruber
Hi,
unfortunately, the gradle example in the docs has grown a bit old [1] and I 
haven't gotten around to updating it yet. Nonetheless, we are using an updated 
version internally and so far this has been working fine. The latest project 
we've been using this at is available at:
https://github.com/ververica/lab-flink-latency
It should be rather simple to check this out and adapt to your needs.

I'd love to get some feedback on it so that I can eventually get this into the 
Flink docs/quickstarts.


Nico



[1] https://issues.apache.org/jira/browse/FLINK-24478

On Wednesday, 19 January 2022 15:07:39 CET HG wrote:
> Hi
> For  my first project I followed :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastre
> am/project-configuration/ where there a build.gradle example
> 
> I created a directory, entered into it and did gradle init.
> The I created the build.gradle as per the example.
> 
> I opened the directory with IntelliJ and opened the build.gradle file
> When performing the "load changes" I bump into issues.
> 
> The first issue was a minor one:
> Build file '/u01/testje/build.gradle' line: 41
>  A problem occurred evaluating root project 'testje'.
> 
> > Cannot add task 'wrapper' as a task with that name already exists.
> > Run with --scan to get full insights.
> 
> Easy : I had to remove "task" and (type Wrapper).
> 
> After that I have :
> 
> Caused by:
> org.gradle.internal.metaobject.AbstractDynamicObject$CustomMessageMissingMet
> hodException: *Could not find method compile() for arguments
> [org.apache.flink:flink-streaming-java_2.11:1.13-SNAPSHOT*] on object of
> type
> org.gradle.api.internal.artifacts.dsl.dependencies.DefaultDependencyHandler.
> 
> I tried scala 2.11 and 1.14 SNAPSHOT  but nothing works.
> 
> When I look at
> https://repository.apache.org/content/repositories/snapshots/org/apache/flin
> k/flink-streaming-java.. everything is available
> 
> What am I doing wrong?
> 
> Regards Hans-Peter


-- 
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner




Re: FileSource Usage

2022-01-20 Thread Guowei Ma
Hi, Meghajit

1. From the implementation [1] the order of split depends on the
implementation of the FileSystem.

2. From the implementation [2] the order of the file also depends on the
implementation of the FileSystem.

3. Currently there is no such public interface ,which you could extend to
implement your own strategy. Would you like to share the specific problem
you currently meet?

3. `FileSource` supports checkpoints. I think the watermark is a general
mechanism so you could read the related documentation[3].

[1]
https://github.com/apache/flink/blob/355b165859aebaae29b6425023d352246caa0613/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java#L141

[2]
https://github.com/apache/flink/blob/d33c39d974f08a5ac520f220219ecb0796c9448c/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java#L102

[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/event-time/generating_watermarks/
Best,
Guowei


On Wed, Jan 19, 2022 at 6:06 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> We are using FileSource
>  to
> process Parquet Files and had a few doubts around it. Would really
> appreciate if somebody can help answer them:
>
> 1. For a given file, does FileSource read the contents inside it in order
> ? In other words, what is the order in which the file splits are generated
> from the contents of the file ?
>
> 2. We want to provide a GCS Bucket URL to the FileSource so that it can
> read parquet files from there. The bucket has multiple parquet files.
> Wanted to know, what is the order in which the files will be picked and
> processed by this FileSource ? Can we provide an order strategy ourselves,
> say, process according to creation time ?
>
> 3. Is it possible/good practice to apply checkpointing and watermarking
> for a bounded source like FileSource ?
>
> --
> *Regards,*
> *Meghajit*
>


Tuning akka.ask.timeout

2022-01-20 Thread Paul Lam
Hi,

I’m tuning a Flink job with 1000+ parallelism, which frequently fails with Akka 
TimeOutException (it was fine with 200 parallelism). 

I see some posts recommend increasing `akka.ask.timeout` to 120s. I’m not 
familiar with Akka but it looks like a very long time compared to the default 
10s and as a response timeout.

So I’m wondering what’s the reasonable range for this option? And why would the 
Actor fail to respond in time (the message was dropped due to pressure)?

Any input would be appreciated! Thanks a lot.

Best,
Paul Lam