Re: flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-19 Thread yidan zhao
部分任务估计是原先看过ui图,打开后相关数据都能看,但是数字不变。比如其中一个任务的输入节点部分:Records Sent
504,685,253,这个数字就不变了(但任务实际是在处理数据的),看网络请求也的确固定一直返回这个数据。
纯粹转圈不出数据的任务是新提交的任务。

按照以往,我重启jm可能解决这个问题。

yidan zhao  于2022年5月20日周五 12:05写道:
>
> web ui图:https://s3.bmp.ovh/imgs/2022/05/20/dd142de9be3a2c99.png
> 网络视图:https://i.bmp.ovh/imgs/2022/05/20/f3c741b28bd208d4.png
>
> JM1(rest server leader) 异常日志:
> WARN  2022-05-20 12:02:12,523
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner   - Could
> not properly discard completed checkpoint 22259.
> java.io.IOException: Directory
> bos://flink-bucket/flink/default-checkpoints/bal_baiduid_ft_job/b03390c8295713fbd79f57f57a1e3bdb/chk-22259
> is not empty.
> at 
> org.apache.hadoop.fs.bos.BaiduBosFileSystem.delete(BaiduBosFileSystem.java:209)
> ~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:160)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at 
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at 
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discard(CompletedCheckpoint.java:263)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at 
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$2(CheckpointsCleaner.java:85)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_251]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_251]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> INFO  2022-05-20 12:03:22,441
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
> Triggering checkpoint 21979 (type=CHECKPOINT) @ 1653019401517 for job
> 07950b109ab5c3a0ed8576673ab562f7.
> INFO  2022-05-20 12:03:31,061
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
> Completed checkpoint 21979 for job 07950b109ab5c3a0ed8576673ab562f7
> (1785911977 bytes in 9066 ms).
>
>
> 如上,我web-ui是开启的,所有是一直有请求刷的,不存在相关异常(当然本身从请求返回码200来看也不像是异常)。
>
> Shengkai Fang  于2022年5月20日周五 10:50写道:
> >
> > 你好,图挂了,应该是需要图床工具。
> >
> > 另外,能否贴一下相关的异常日志呢?
> >
> > Best,
> > Shengkai
> >
> > yidan zhao  于2022年5月20日周五 10:28写道:
> >
> > > UI视图:[image: 1.png].
> > >
> > > 网络视图:
> > > [image: image.png]
> > >
> > >
> > > 补充部分集群部署信息:
> > > (1)flink1.13,standalone集群,基于zk做的HA。3 jm,若干tm。
> > > (2)jm的rest api开启了ssl,基于 nginx
> > > 做了代理转发(但大概率不会是机制问题,因为不是百分百出现此问题,我集群其他任务都正常,都是运行一段时间后会出现)。
> > >  猜测:是否可能和运行一段时间后,出现jm进程挂掉,任务recover更换,rest jm的leader变换有关呢?
> > > 目前来看部分jm的日志偶尔存在ssl握手相关报错,但也挺奇怪。  注意:我web
> > > ui打开,看着jm的日志,是不出日志的(我是基于zk拿到leader,看leader jm的日志)。我web
> > > ui一直刷,理论上如果出错日志应该有相关报错,但实际没报错,报错和这个无关,都是ckpt吧啦的。
> > >


Re: 反复提交Job会导致TaskManager 元空间oom?

2022-05-19 Thread yu'an huang
你好,

1. TM MetaSpace 
OOM应该是由于你反复提交job造成的。反复提交作业可能导致短时间内Class和UserClassLoader过多并且无法被及时回收。
2. TM OOM导致不能及时发送心跳给JM就会因为超时让JM认为TM下线。理论上OOM了是会自己退出进程的。
3. 
如果不能将集群部署到Kubernetes或者yarn下的话,建议可以通过JM的API监控下TM数量,如果TM数量下降可以做些运维操作。下面的link是一个JM 
api的介绍:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/









> On 18 May 2022, at 10:45 AM, 知而不惑  wrote:
> 
> 请问大家一个问题,
> 场景:
> 版本是Flink 1.14
> 我们使用standalone 模式,我们的Flink job由supervisorctl托管,JM和TM用systemd托管
> 
> 
> 异常:
> job异常重启设置了两次的flink延迟重启:restart-strategy.fixed-delay.attempts: 2,
> 我们线上有个业务代码没有捕获一个异常,导致job重启两次后,再由supervisorctl重新提交job,循环了很多次之后,
> TM出现了元空间OOM(我们已经把元空间的内存加大,还是会出现),然后TM就掉了,控制台上没有TM了,这影响了其他的job,但是TM进程也没有退出,我们的TM由Systemd托管,所以TM一直没有重启,
> 处在一个“假死”状态,我们是用的standalone模式,只有一个TM,
> 
> 
> 日志:
> TM日志出现:TM metaspace oom
> JM日志:Association with remote system [akka.tcp://flink@localhost:43583] has 
> failed, address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink@localhost:43583]] Caused by: [java.net.ConnectException: 
> Connection refused: localhost/127.0.0.1:43583]
> JM 连接 TM接口失败,unreachable
> 
> 
> 
> 补充:
> 我们把元空间内存配置放到512M。再次重现:
> 发现每次提交job的时候:
> 观察tm metaspace 内存变化:179MB 183MB 207MB 232MB 256MB 
> 280MB 352MB 372MB
> 元空间一直没回收,这样最终会导致TM metaspace oom
> 
> 
> 
> 问题:
> 1.想问下TM元空间oom异常,是反复提交job造成,还是job的业务代码有问题,
> 2.TM元空间OOM为什么会导致JM认为TM掉线,TM也不自己退出进程
> 
> 
> 希望获得的帮助:
> 1.上述问题原因
> 2.有什么办法可以在standalone模式下,识别到TM掉线,从而我们能做一些自动的运维操作:比如重启整个集群



Re: Could not copy native libraries - Permission denied

2022-05-19 Thread yu'an huang
What is your deployment mode, on yarn, Kubernetes or standalone? Can you 
provide more logs about this error?


> On 18 May 2022, at 4:07 PM, Zain Haider Nemati  wrote:
> 
> Hi, 
> We are using flink version 1.13 with a kafka source and a kinesis sink with a 
> parallelism of 3.
> On submitting the job I get this error
> 
>  Could not copy native binaries to temp directory 
> /tmp/amazon-kinesis-producer-native-binaries
> Followed by permission denied even though all the permissions have been 
> provided and is being run as root user. What could be causing this?



Re: Applying backpressure to limit state memory consumption

2022-05-19 Thread yu'an huang
H Robini,

In my experience, the state size of memory state backend is limit by the heap 
memory. See this link for details:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/

“When deciding between HashMapStateBackend and RocksDB, it is a choice between 
performance and scalability. HashMapStateBackend is very fast as each state 
access and update operates on objects on the Java heap; however, state size is 
limited by available memory within the cluster. "

if the size of your window state is really huge, you should choose other state 
backend. 
Hopes my reply would help to you.

Best,
Yuan


> On 19 May 2022, at 9:19 PM, Robin Cassan  
> wrote:
> 
> Hey all!
> I have a conceptual question on the DataStream API: when using an in-memory 
> state backend (like the HashMapStateBackend), how can you ensure that the 
> hashmap won't grow uncontrollably until OutOfMemory happens?
> 
> In my case, I would be consuming from a Kafka topic, into a SessionWindow. 
> The HashMap state would be accumulating data in memory until the timeout 
> expires, with a PurgingTrigger to clean up the state.
> The cluster's memory would be sized to handle a normal load, but in case of 
> lag or spikes we want the Flink job to slow down its consumption of the kafka 
> topic so that the window's state stays capped at a given size (could be the 
> number of keys or the total Gb). We have tested this scenario, and Flink 
> would consume really quickly from Kafka until memory was so full that it was 
> stuck in GC loops, unable to make progress on the ProcessFunction applied 
> after the window.
> 
> Is there any setting to limit the size of a Window state? Maybe there are 
> some bounded buffers between operators that can be adjusted?
> 
> Thanks a lot for your help!
> Robin



Re: flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-19 Thread yidan zhao
web ui图:https://s3.bmp.ovh/imgs/2022/05/20/dd142de9be3a2c99.png
网络视图:https://i.bmp.ovh/imgs/2022/05/20/f3c741b28bd208d4.png

JM1(rest server leader) 异常日志:
WARN  2022-05-20 12:02:12,523
org.apache.flink.runtime.checkpoint.CheckpointsCleaner   - Could
not properly discard completed checkpoint 22259.
java.io.IOException: Directory
bos://flink-bucket/flink/default-checkpoints/bal_baiduid_ft_job/b03390c8295713fbd79f57f57a1e3bdb/chk-22259
is not empty.
at 
org.apache.hadoop.fs.bos.BaiduBosFileSystem.delete(BaiduBosFileSystem.java:209)
~[bos-hdfs-sdk-1.0.1-SNAPSHOT-0.jar:?]
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:160)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discard(CompletedCheckpoint.java:263)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$2(CheckpointsCleaner.java:85)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_251]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_251]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
INFO  2022-05-20 12:03:22,441
org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
Triggering checkpoint 21979 (type=CHECKPOINT) @ 1653019401517 for job
07950b109ab5c3a0ed8576673ab562f7.
INFO  2022-05-20 12:03:31,061
org.apache.flink.runtime.checkpoint.CheckpointCoordinator-
Completed checkpoint 21979 for job 07950b109ab5c3a0ed8576673ab562f7
(1785911977 bytes in 9066 ms).


如上,我web-ui是开启的,所有是一直有请求刷的,不存在相关异常(当然本身从请求返回码200来看也不像是异常)。

Shengkai Fang  于2022年5月20日周五 10:50写道:
>
> 你好,图挂了,应该是需要图床工具。
>
> 另外,能否贴一下相关的异常日志呢?
>
> Best,
> Shengkai
>
> yidan zhao  于2022年5月20日周五 10:28写道:
>
> > UI视图:[image: 1.png].
> >
> > 网络视图:
> > [image: image.png]
> >
> >
> > 补充部分集群部署信息:
> > (1)flink1.13,standalone集群,基于zk做的HA。3 jm,若干tm。
> > (2)jm的rest api开启了ssl,基于 nginx
> > 做了代理转发(但大概率不会是机制问题,因为不是百分百出现此问题,我集群其他任务都正常,都是运行一段时间后会出现)。
> >  猜测:是否可能和运行一段时间后,出现jm进程挂掉,任务recover更换,rest jm的leader变换有关呢?
> > 目前来看部分jm的日志偶尔存在ssl握手相关报错,但也挺奇怪。  注意:我web
> > ui打开,看着jm的日志,是不出日志的(我是基于zk拿到leader,看leader jm的日志)。我web
> > ui一直刷,理论上如果出错日志应该有相关报错,但实际没报错,报错和这个无关,都是ckpt吧啦的。
> >


Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Biao Geng
Hi there,
@Zain, Weihua's suggestion should be able to fulfill the request to check
JM logs. If you do want to use YARN cli for running Flink applications, it
is possible to check JM's log with the YARN command like:
*yarn logs -applicationId application_xxx_yyy -am -1 -logFiles
jobmanager.log*
For TM log, command would be like:
* yarn logs -applicationId  -containerId   -logFiles
taskmanager.log*
Note, it is not super easy to find the container id of TM. Some workaround
would be to check JM's log first and get the container id for TM from that.
You can also learn more about the details of above commands from *yarn logs
-help*

@Shengkai, yes, you are right the actual JM address is managed by YARN. To
access the JM launched by YARN, users need to access YARN web ui to find
the YARN application by applicationId and then click 'application master
url' of that application to be redirected to Flink web ui.

Best,
Biao Geng

Shengkai Fang  于2022年5月20日周五 10:59写道:

> Hi.
>
> I am not familiar with the YARN application mode. Because the job manager
> is started when submit the jobs. So how can users know the address of the
> JM? Do we need to look up the Yarn UI to search the submitted job with the
> JobID?
>
> Best,
> Shengkai
>
> Weihua Hu  于2022年5月20日周五 10:23写道:
>
>> Hi,
>> You can get the logs from Flink Web UI if job is running.
>> Best,
>> Weihua
>>
>> 2022年5月19日 下午10:56,Zain Haider Nemati  写道:
>>
>> Hey All,
>> How can I check logs for my job when it is running in application mode
>> via yarn
>>
>>
>>


Re: Does kafka key is supported in kafka sink table

2022-05-19 Thread Shengkai Fang
Hi.

Yes. Flink supports to write the value to the Kafka record key parts. You
just need to specify which column belongs to the key in the WITH blocks,
e.g.

```
CREATE TABLE kafka_sink (
...
) WITH (
   `key.fields` = 'id'
);
```

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#key-fields

Dhavan Vaidya  于2022年5月17日周二 19:16写道:

> Hey wang!
>
> Perhaps this is what you want:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-format
> &
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-fields
> ?
>
> Note that the fields *have* to be one of the "top" level columns of your
> sink table (i.e., fields inside Row are not supported, at least in PyFlink).
>
> Thanks!
>
> On Mon, 16 May 2022 at 19:33, wang <24248...@163.com> wrote:
>
>> Hi dear engineer,
>>
>> Flink sql supports kafka sink table, not sure whether it supports kafka
>> key in kafka sink table? As I want to specify kafka key when inserting
>> data into kafka sink table.
>> Thanks for your answer in advance.
>>
>>
>>
>> Thanks && Regards,
>> Hunk
>>
>>
>>
>>
>


Re: Does kafka key is supported in kafka sink table

2022-05-19 Thread Shengkai Fang
Hi.

Yes. Flink supports to write the value to the Kafka record key parts. You
just need to specify which column belongs to the key in the WITH blocks,
e.g.

```
CREATE TABLE kafka_sink (
...
) WITH (
   `key.fields` = 'id'
);
```

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#key-fields

Dhavan Vaidya  于2022年5月17日周二 19:16写道:

> Hey wang!
>
> Perhaps this is what you want:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-format
> &
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-fields
> ?
>
> Note that the fields *have* to be one of the "top" level columns of your
> sink table (i.e., fields inside Row are not supported, at least in PyFlink).
>
> Thanks!
>
> On Mon, 16 May 2022 at 19:33, wang <24248...@163.com> wrote:
>
>> Hi dear engineer,
>>
>> Flink sql supports kafka sink table, not sure whether it supports kafka
>> key in kafka sink table? As I want to specify kafka key when inserting
>> data into kafka sink table.
>> Thanks for your answer in advance.
>>
>>
>>
>> Thanks && Regards,
>> Hunk
>>
>>
>>
>>
>


Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Shengkai Fang
Hi.

I am not familiar with the YARN application mode. Because the job manager
is started when submit the jobs. So how can users know the address of the
JM? Do we need to look up the Yarn UI to search the submitted job with the
JobID?

Best,
Shengkai

Weihua Hu  于2022年5月20日周五 10:23写道:

> Hi,
> You can get the logs from Flink Web UI if job is running.
> Best,
> Weihua
>
> 2022年5月19日 下午10:56,Zain Haider Nemati  写道:
>
> Hey All,
> How can I check logs for my job when it is running in application mode via
> yarn
>
>
>


Re: flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-19 Thread Shengkai Fang
你好,图挂了,应该是需要图床工具。

另外,能否贴一下相关的异常日志呢?

Best,
Shengkai

yidan zhao  于2022年5月20日周五 10:28写道:

> UI视图:[image: 1.png].
>
> 网络视图:
> [image: image.png]
>
>
> 补充部分集群部署信息:
> (1)flink1.13,standalone集群,基于zk做的HA。3 jm,若干tm。
> (2)jm的rest api开启了ssl,基于 nginx
> 做了代理转发(但大概率不会是机制问题,因为不是百分百出现此问题,我集群其他任务都正常,都是运行一段时间后会出现)。
>  猜测:是否可能和运行一段时间后,出现jm进程挂掉,任务recover更换,rest jm的leader变换有关呢?
> 目前来看部分jm的日志偶尔存在ssl握手相关报错,但也挺奇怪。  注意:我web
> ui打开,看着jm的日志,是不出日志的(我是基于zk拿到leader,看leader jm的日志)。我web
> ui一直刷,理论上如果出错日志应该有相关报错,但实际没报错,报错和这个无关,都是ckpt吧啦的。
>


Re: How to KafkaConsume from Particular Partition in Flink(version 1.14.4)

2022-05-19 Thread Shengkai Fang
Hi,

If you use SQL API,  you can specify the partition in the DDL[1] and filter
out the record that you don't need.

```
CREATE TABLE KafkaSource (
...
`partition` METADATA
) WITH (
  ...
);

SELECT * FROM KafkaSource
WHERE partition = 1;
```

Best,
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata

Weihua Hu  于2022年5月19日周四 21:42写道:

> Hi Harshit,
> FlinkKafkaConsumer does not support consuming a particular partition of a
> topic.
>
> Best,
> Weihua
>
> 2022年5月18日 下午5:02,harshit.varsh...@iktara.ai 
> 写道:
>
> particular
>
>
>


Flink - SQL Tumble End on event time not returning any result

2022-05-19 Thread Raghunadh Nittala
Hi Team,

I have a Flink job that consumes from a kafka topic and tries to create
windows (using Tumble) based on few columns like eventId and eventName.
Kafka topic has data in format of comma separated values like below:

event1,Util1,1647614467000,0.12
event1,Util1,1647614527000,0.26
event1,Util1,1647614587000,0.71
event2,Util2,1647614647000,0.08
event2,Util2,1647614707000,0.32
event2,Util2,1647614767000,0.23
event2,Util2,1647614827000,0.85
event1,Util1,1647614887000,0.08
event1,Util1,1647614947000,0.32


Here is the Flink code I’m using for this:

main() {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env)

val kafkaSource = KafkaSource.builder()
.setBootstrapServers("localhost:9092")
.setTopics("an-topic")
.setGroupId("testGroup")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(SimpleStringSchema())
.build()

val kafkaStream = env.fromSource(kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)),
"KafkaSource")

val kafkaRowMapper = kafkaStream.map(RowMapper())

val finalTable = tableEnv.fromDataStream(kafkaRowMapper,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(f2, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
).renameColumns(
`$`("f0").`as`("eventId"),
`$`("f1").`as`("eventName"),
`$`("f3").`as`("eventValue")
)
tableEnv.createTemporaryView("finalTable", finalTable)

val sqlQuery = "SELECT eventId, eventName, TUMBLE_END(event_time,
INTERVAL '1' MINUTE) AS event_time_new, " +
"LAST_VALUE(eventValue) AS eventValue FROM finalTable " +
"GROUP BY eventId, eventName, TUMBLE(event_time, INTERVAL
'1' MINUTE)"
val resultTable = tableEnv.sqlQuery(sqlQuery)
tableEnv.toDataStream(resultTable).print()

env.execute("TestJob")
}

class RowMapper: MapFunction> {
override fun map(value: String): Tuple4 {
val lineArray = value.split(",")

return Tuple4 (lineArray[0], lineArray[1],
lineArray[2].toLong(), lineArray[3].toFloat())
}
}


When I use proc_time instead of event_time, I’m able to create windows and
see results.
I suspect that the code is correct but the way I'm testing is wrong. How
shall I test this for event time?

Can someone please help me on what mistake I’m making here?

Thanks in advance.


Re: flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-19 Thread yidan zhao
UI视图:[image: 1.png].

网络视图:
[image: image.png]


补充部分集群部署信息:
(1)flink1.13,standalone集群,基于zk做的HA。3 jm,若干tm。
(2)jm的rest api开启了ssl,基于 nginx
做了代理转发(但大概率不会是机制问题,因为不是百分百出现此问题,我集群其他任务都正常,都是运行一段时间后会出现)。
 猜测:是否可能和运行一段时间后,出现jm进程挂掉,任务recover更换,rest jm的leader变换有关呢?
目前来看部分jm的日志偶尔存在ssl握手相关报错,但也挺奇怪。  注意:我web
ui打开,看着jm的日志,是不出日志的(我是基于zk拿到leader,看leader jm的日志)。我web
ui一直刷,理论上如果出错日志应该有相关报错,但实际没报错,报错和这个无关,都是ckpt吧啦的。


Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Weihua Hu
Hi,
You can get the logs from Flink Web UI if job is running.
Best,
Weihua

> 2022年5月19日 下午10:56,Zain Haider Nemati  写道:
> 
> Hey All,
> How can I check logs for my job when it is running in application mode via 
> yarn



Re: Incorrect checkpoint id used when job is recovering

2022-05-19 Thread yuxia
There's a simliar issue FLINK-19816[1] 

[1] [ https://issues.apache.org/jira/browse/FLINK-19816 | 
https://issues.apache.org/jira/browse/FLINK-19816 ] 

Best regards, 
Yuxia 


发件人: "tao xiao"  
收件人: "User"  
发送时间: 星期四, 2022年 5 月 19日 下午 9:16:34 
主题: Re: Incorrect checkpoint id used when job is recovering 

Hi team, 

Can anyone shed some light? 

On Sat, May 14, 2022 at 8:56 AM tao xiao < [ mailto:xiaotao...@gmail.com | 
xiaotao...@gmail.com ] > wrote: 



Hi team, 

Does anyone have any ideas? 

On Thu, May 12, 2022 at 9:20 PM tao xiao < [ mailto:xiaotao...@gmail.com | 
xiaotao...@gmail.com ] > wrote: 

BQ_BEGIN

Forgot to mention the Flink version is 1.13.2 and we use kubernetes native mode 

On Thu, May 12, 2022 at 9:18 PM tao xiao < [ mailto:xiaotao...@gmail.com | 
xiaotao...@gmail.com ] > wrote: 

BQ_BEGIN

Hi team, 
I met a weird issue when a job tries to recover from JM failure. The success 
checkpoint before JM crashed is 41205 

``` 
{"log":"2022-05-10 14:55:40,663 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 41205 for job  (9453840 bytes in 
1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"} 
``` 
However JM tries to recover the job with an old checkpoint 41051 which doesn't 
exist that leads to unrecoverable state 

``` 
"2022-05-10 14:59:38,949 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying 
to retrieve checkpoint 41051.\n" 
``` 

Full log attached 

-- 
Regards, 
Tao 





-- 
Regards, 
Tao 

BQ_END



-- 
Regards, 
Tao 

BQ_END



-- 
Regards, 
Tao 



Re: Kinesis Sink - Data being received with intermittent breaks

2022-05-19 Thread Ber, Jeremy
Hi Zain—

Are you seeing any data loss present within the Flink Dashboard subtasks of 
each task? On the bottom of your dashboard you should see data going from each 
blue box to the next. Is this a comprehensive set of data? Meaning do you see 
80M from the source -> first operator -> second operator -> sink?

Secondly, it may be easier to troubleshoot this by removing a few variables. 
Would you be able to remove the operator which segregates your data into 100 
length records and simply forward that data to the next operator? 
Simultaneously, could you leave the Kinesis Producer configuration settings 
(apart from queue limit) at their defaults? This will give a good baseline from 
which to improve upon.

Jeremy

From: Zain Haider Nemati 
Date: Wednesday, May 18, 2022 at 6:15 AM
To: Danny Cranmer 
Cc: Alexander Preuß , user 

Subject: RE: [EXTERNAL]Kinesis Sink - Data being received with intermittent 
breaks


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hey Danny,
Thanks for getting back to me.
- You are seeing bursty throughput, but the job is keeping up? There is no 
backpressure? --> Correct I'm not seeing any backpressure in any of the metrics
- What is the throughput at the sink? --> num of records out -- 1100 per 10 
seconds
- On the graph screenshot, what is the period and stat (sum/average/etc)? -->It 
is incoming data (MB/s) each second

So let me explain this in totality, the number of records residing in the 
source are about 80 million and the number of records i see in the kinesis data 
stream after it has consumed the data from source is about 20 million so im 
seeing alot of data loss and I think this potentially has to do with the 
intermediate dips im seeing in the records coming in the data stream.

What are the configurations you guys generally suggest for data of this range 
and sinking to a kinesis data stream?

On Wed, May 18, 2022 at 2:00 AM Danny Cranmer 
mailto:dannycran...@apache.org>> wrote:
Hello Zain,

Thanks for providing the additional information. Going back to the original 
issue:
- You are seeing bursty throughput, but the job is keeping up? There is no 
backpressure?
- What is the throughput at the sink?
- On the graph screenshot, what is the period and stat (sum/average/etc)?

Let me shed some light on the log messages, let's take this example:

LogInputStreamReader ... Stage 1 Triggers ...  { stream: 'flink-kafka-tracer', 
manual: 0, count: 0, size: 0, matches: 0, timed: 3, UserRecords: 6, 
KinesisRecords: 3 }

Flush trigger reason:
- manual: the flush was manually triggered
- count: flush was triggered by the number of records in the container
- size: the flush was triggered by the number of bytes in the container
- matches: the predicate was matched
- timed: the flush is triggered by elapsed timer

Input/Output:
- UserRecords: Number of input records KPL flushed (this can be higher than 
KinesisRecords when aggregation is enabled)
- KinesisRecords: Number of records shipped to Kinesis Data Streams

Stage 2 triggers tells us the number of API invocations via the PutRecords 
field.

I can see from your logs that the majority of flushes are due to the timer, and 
it does not look overly bursty. Seems to sit at around 3 records per 15 
seconds, or 1 record every 5 seconds. This seems very low, is it expected?

Thanks,
Danny Cranmer

On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati 
mailto:zain.hai...@retailo.co>> wrote:
Hey Danny,
Thanks for having a look at the issue.
I am using a custom flink operator to segregate the data into a consistent 
format of length 100 which is no more than 1 MB. The configurations I shared 
were after I was exploring tweaking some of them to see if it improves the 
throughput.

Regarding your queries :
- Which Flink version is this? -- > Version 1.13
- Can you see any errors in the Flink logs?  --> No, Im attaching flink logs 
after I have set all the configurations to default
- Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> I 
was before segregating into smaller chunks not anymore
- How many shards does your stream have? --> It has 4 shards
- What is your sink operator parallelism? --> 1
- What is the general health of your job graph? --> This is the only job 
running at the moment, it isn't unhealthy
  - Are the operators upstream of the sink backpressured? --> No
  - Are you sure the sink is actually the issue here? --> I have used the 
.print() as a sink and Im seeing all the records in real time it chokes when 
paired with sink
  - Are there any other potential bottlenecks? --> So data is coming in from 
source correctly, I have a flatmap transformation enabled which reads and 
segments it into chunks of <=1MB which is also tested using the .print() sink
- When you say you are trying to achieve "1MB chunks", I assume this is per 
Kinesis record, not per PutRecords batch? --> 

Re: Checkpointing - Job Manager S3 Head Requests are 404s

2022-05-19 Thread Aeden Jameson
Great, that all makes sense to me. Thanks again.

On Thu, May 19, 2022 at 11:42 AM David Anderson  wrote:
>
> Sure, happy to try to help.
>
> What's happening with the hadoop filesystem is that before it writes each key 
> it checks to see if the "parent directory" exists by checking for a key with 
> the prefix up to the last "/", and if that key isn't found it then creates 
> empty marker files to cause of that parent directory to exist. These 
> existence checks are S3 HEAD requests. None of this is helpful in the case of 
> Flink checkpointing.
>
> And yes, while Presto avoids this unnecessary overhead, entropy injection 
> remains an important tool for improving scalability. Where you'll run into 
> quotas and rate limits depends, of course, on factors like the number of 
> stateful tasks in your job(s), the parallelism, and the checkpointing 
> interval.
>
> On Thu, May 19, 2022 at 8:04 PM Aeden Jameson  wrote:
>>
>> Thanks for the response David. That's the conclusion I came to as
>> well.  The Hadoop plugin behavior doesn't appear to reflect more
>> recent changes to S3 like strong read-after-write consistency,
>> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>> .
>>
>> Given the improved interaction of the Presto plugin with S3 I would
>> conclude that this increases the size of the cluster that would
>> require entropy injection, yes? But that it doesn't necessarily get
>> rid of the need because one could have a large enough cluster and say
>> a lifecycle policy that could still end up requiring entropy
>> injection.
>>
>> On Thu, May 19, 2022 at 10:24 AM David Anderson  wrote:
>> >
>> > Aeden, this is probably happening because you are using the Hadoop 
>> > implementation of S3.
>> >
>> > The Hadoop S3 filesystem tries to imitate a filesystem on top of S3. In so 
>> > doing it makes a lot of HEAD requests. These are expensive, and they 
>> > violate read-after-create visibility, which is what you seem to be 
>> > experiencing. By contrast, the Presto S3 implementation doesn't do the 
>> > same (harmful in this case) magic, and simply does PUT/GET operations. 
>> > Because that's all Flink needs to checkpointing, this works much better.
>> >
>> > Best,
>> > David
>> >
>> > On Thu, May 12, 2022 at 1:53 AM Aeden Jameson  
>> > wrote:
>> >>
>> >> We're using S3 to store checkpoints. They are taken every minute. I'm
>> >> seeing a large number of 404 responses from S3 being generated by the
>> >> job manager. The order of the entries in the debugging log would imply
>> >> that it's a result of a HEAD request to a key. For example all the
>> >> incidents look like this,
>> >>
>> >>
>> >> 2022-05-11 23:29:00,804 DEBUG com.amazonaws.request [] - Sending
>> >> Request: HEAD https://[MY-BUCKET].s3.amazonaws.com
>> >> /[MY_JOB]/checkpoints/5f4d6923883a1702b206f978fa3637a3/ Headers:
>> >> (amz-sdk-invocation-id: X, Content-Type: application/octet-stream,
>> >> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.788
>> >> Linux/5.4.181-99.354.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/11.0.13+8
>> >> java/11.0.13 scala/2.12.7 vendor/Oracle_Corporation, )
>> >>
>> >> 2022-05-11 23:29:00,815 DEBUG com.amazonaws.request [] - Received
>> >> error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not
>> >> Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not
>> >> Found; ..)
>> >>
>> >> The key does in fact exist. How can I go about resolving this?
>> >>
>> >> --
>> >> Cheers,
>> >> Aeden
>> >>
>> >> GitHub: https://github.com/aedenj
>>
>>
>>
>> --
>> Cheers,
>> Aeden
>>
>> GitHub: https://github.com/aedenj
>> Linked In: http://www.linkedin.com/in/aedenjameson



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson


Re: Checkpointing - Job Manager S3 Head Requests are 404s

2022-05-19 Thread David Anderson
Sure, happy to try to help.

What's happening with the hadoop filesystem is that before it writes each
key it checks to see if the "parent directory" exists by checking for a key
with the prefix up to the last "/", and if that key isn't found it then
creates empty marker files to cause of that parent directory to exist.
These existence checks are S3 HEAD requests. None of this is helpful in the
case of Flink checkpointing.

And yes, while Presto avoids this unnecessary overhead, entropy injection
remains an important tool for improving scalability. Where you'll run into
quotas and rate limits depends, of course, on factors like the number of
stateful tasks in your job(s), the parallelism, and the checkpointing
interval.

On Thu, May 19, 2022 at 8:04 PM Aeden Jameson 
wrote:

> Thanks for the response David. That's the conclusion I came to as
> well.  The Hadoop plugin behavior doesn't appear to reflect more
> recent changes to S3 like strong read-after-write consistency,
>
> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
> .
>
> Given the improved interaction of the Presto plugin with S3 I would
> conclude that this increases the size of the cluster that would
> require entropy injection, yes? But that it doesn't necessarily get
> rid of the need because one could have a large enough cluster and say
> a lifecycle policy that could still end up requiring entropy
> injection.
>
> On Thu, May 19, 2022 at 10:24 AM David Anderson 
> wrote:
> >
> > Aeden, this is probably happening because you are using the Hadoop
> implementation of S3.
> >
> > The Hadoop S3 filesystem tries to imitate a filesystem on top of S3. In
> so doing it makes a lot of HEAD requests. These are expensive, and they
> violate read-after-create visibility, which is what you seem to be
> experiencing. By contrast, the Presto S3 implementation doesn't do the same
> (harmful in this case) magic, and simply does PUT/GET operations. Because
> that's all Flink needs to checkpointing, this works much better.
> >
> > Best,
> > David
> >
> > On Thu, May 12, 2022 at 1:53 AM Aeden Jameson 
> wrote:
> >>
> >> We're using S3 to store checkpoints. They are taken every minute. I'm
> >> seeing a large number of 404 responses from S3 being generated by the
> >> job manager. The order of the entries in the debugging log would imply
> >> that it's a result of a HEAD request to a key. For example all the
> >> incidents look like this,
> >>
> >>
> >> 2022-05-11 23:29:00,804 DEBUG com.amazonaws.request [] - Sending
> >> Request: HEAD https://[MY-BUCKET].s3.amazonaws.com
> >> /[MY_JOB]/checkpoints/5f4d6923883a1702b206f978fa3637a3/ Headers:
> >> (amz-sdk-invocation-id: X, Content-Type: application/octet-stream,
> >> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.788
> >> Linux/5.4.181-99.354.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/11.0.13+8
> >> java/11.0.13 scala/2.12.7 vendor/Oracle_Corporation, )
> >>
> >> 2022-05-11 23:29:00,815 DEBUG com.amazonaws.request [] - Received
> >> error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not
> >> Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not
> >> Found; ..)
> >>
> >> The key does in fact exist. How can I go about resolving this?
> >>
> >> --
> >> Cheers,
> >> Aeden
> >>
> >> GitHub: https://github.com/aedenj
>
>
>
> --
> Cheers,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
>


Re: Checkpointing - Job Manager S3 Head Requests are 404s

2022-05-19 Thread Aeden Jameson
Thanks for the response David. That's the conclusion I came to as
well.  The Hadoop plugin behavior doesn't appear to reflect more
recent changes to S3 like strong read-after-write consistency,
https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
.

Given the improved interaction of the Presto plugin with S3 I would
conclude that this increases the size of the cluster that would
require entropy injection, yes? But that it doesn't necessarily get
rid of the need because one could have a large enough cluster and say
a lifecycle policy that could still end up requiring entropy
injection.

On Thu, May 19, 2022 at 10:24 AM David Anderson  wrote:
>
> Aeden, this is probably happening because you are using the Hadoop 
> implementation of S3.
>
> The Hadoop S3 filesystem tries to imitate a filesystem on top of S3. In so 
> doing it makes a lot of HEAD requests. These are expensive, and they violate 
> read-after-create visibility, which is what you seem to be experiencing. By 
> contrast, the Presto S3 implementation doesn't do the same (harmful in this 
> case) magic, and simply does PUT/GET operations. Because that's all Flink 
> needs to checkpointing, this works much better.
>
> Best,
> David
>
> On Thu, May 12, 2022 at 1:53 AM Aeden Jameson  wrote:
>>
>> We're using S3 to store checkpoints. They are taken every minute. I'm
>> seeing a large number of 404 responses from S3 being generated by the
>> job manager. The order of the entries in the debugging log would imply
>> that it's a result of a HEAD request to a key. For example all the
>> incidents look like this,
>>
>>
>> 2022-05-11 23:29:00,804 DEBUG com.amazonaws.request [] - Sending
>> Request: HEAD https://[MY-BUCKET].s3.amazonaws.com
>> /[MY_JOB]/checkpoints/5f4d6923883a1702b206f978fa3637a3/ Headers:
>> (amz-sdk-invocation-id: X, Content-Type: application/octet-stream,
>> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.788
>> Linux/5.4.181-99.354.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/11.0.13+8
>> java/11.0.13 scala/2.12.7 vendor/Oracle_Corporation, )
>>
>> 2022-05-11 23:29:00,815 DEBUG com.amazonaws.request [] - Received
>> error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not
>> Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not
>> Found; ..)
>>
>> The key does in fact exist. How can I go about resolving this?
>>
>> --
>> Cheers,
>> Aeden
>>
>> GitHub: https://github.com/aedenj



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson


Re: Confusing S3 Entropy Injection Behavior

2022-05-19 Thread Aeden Jameson
Thanks for the response David. I'm using Flink 1.13.5.

>> For point 1 the behavior you are seeing is what is expected.

Great. That's what I concluded after digging into things a little
more. This helps me be sure I just didn't miss some other
configuration. Thank you.

>> For point 2, I'm not sure.

Ok, It appears to be the path to the file named "metadata"

>> FWIW, I would urge you to use presto instead of hadoop for checkpointing on 
>> S3. The performance of the hadoop "filesystem" is problematic when it's used 
>> for checkpointing.

For sure, it's definitely on the list.

On Thu, May 19, 2022 at 7:06 AM David Anderson  wrote:
>
> Aeden,
>
> I want to expand my answer after having re-read your question a bit more 
> carefully.
>
> For point 1 the behavior you are seeing is what is expected. With hadoop the 
> metadata written by the job manager will literally include "_entropy_" in its 
> path, while this will be replaced in paths of any and all checkpoint data 
> files. With presto the metadata path won't include "_entropy_" at all (it 
> will disappear, rather than being replaced by something specific).
>
> For point 2, I'm not sure.
>
> David
>
> On Thu, May 19, 2022 at 2:37 PM David Anderson  wrote:
>>
>> This sounds like it could be FLINK-17359 [1]. What version of Flink are you 
>> using?
>>
>> Another likely explanation arises from the fact that only the checkpoint 
>> data files (the ones created and written by the task managers) will have the 
>> _entropy_ replaced. The job manager does not inject entropy into the path of 
>> the checkpoint metadata, so that it remains at a predictable URI. Since 
>> Flink only writes keyed state larger than state.storage.fs.memory-threshold 
>> into the checkpoint data files, and only those files have entropy injected 
>> into their paths, if all of your state is small it will all end up in the 
>> metadata file and you don't see any entropy injection happening. See the 
>> comments on [2] for more on this.
>>
>> FWIW, I would urge you to use presto instead of hadoop for checkpointing on 
>> S3. The performance of the hadoop "filesystem" is problematic when it's used 
>> for checkpointing.
>>
>> Regards,,
>> David
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17359
>> [2] https://issues.apache.org/jira/browse/FLINK-24878
>>
>> On Wed, May 18, 2022 at 7:48 PM Aeden Jameson  
>> wrote:
>>>
>>> I have checkpoints setup against s3 using the hadoop plugin. (I'll
>>> migrate to presto at some point) I've setup entropy injection per the
>>> documentation with
>>>
>>> state.checkpoints.dir: s3://my-bucket/_entropy_/my-job/checkpoints
>>> s3.entropy.key: _entropy_
>>>
>>> I'm seeing some behavior that I don't quite understand.
>>>
>>> 1. The folder s3://my-bucket/_entropy_/my-job/checkpoints/...
>>> literally exists. Meaning that "_entropy_" has not been replaced. At
>>> the same time there are also a bunch of folders where "_entropy_" has
>>> been replaced. Is that to be expected? If so, would someone elaborate
>>> on why this is happening?
>>>
>>> 2. Should the paths in the checkpoints history tab in the FlinkUI
>>> display the path the key? With the current setup it is not.
>>>
>>> Thanks,
>>> Aeden
>>>
>>> GitHub: https://github.com/aedenj
>>> Linked In: http://www.linkedin.com/in/aedenjameson



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson


Re: Checkpointing - Job Manager S3 Head Requests are 404s

2022-05-19 Thread David Anderson
Aeden, this is probably happening because you are using the Hadoop
implementation of S3.

The Hadoop S3 filesystem tries to imitate a filesystem on top of S3. In so
doing it makes a lot of HEAD requests. These are expensive, and they
violate read-after-create visibility, which is what you seem to be
experiencing. By contrast, the Presto S3 implementation doesn't do the same
(harmful in this case) magic, and simply does PUT/GET operations. Because
that's all Flink needs to checkpointing, this works much better.

Best,
David

On Thu, May 12, 2022 at 1:53 AM Aeden Jameson 
wrote:

> We're using S3 to store checkpoints. They are taken every minute. I'm
> seeing a large number of 404 responses from S3 being generated by the
> job manager. The order of the entries in the debugging log would imply
> that it's a result of a HEAD request to a key. For example all the
> incidents look like this,
>
>
> 2022-05-11 23:29:00,804 DEBUG com.amazonaws.request [] - Sending
> Request: HEAD https://[MY-BUCKET].s3.amazonaws.com
> /[MY_JOB]/checkpoints/5f4d6923883a1702b206f978fa3637a3/ Headers:
> (amz-sdk-invocation-id: X, Content-Type: application/octet-stream,
> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.788
> Linux/5.4.181-99.354.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/11.0.13+8
> java/11.0.13 scala/2.12.7 vendor/Oracle_Corporation, )
>
> 2022-05-11 23:29:00,815 DEBUG com.amazonaws.request [] - Received
> error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not
> Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not
> Found; ..)
>
> The key does in fact exist. How can I go about resolving this?
>
> --
> Cheers,
> Aeden
>
> GitHub: https://github.com/aedenj
>


Job Logs - Yarn Application Mode

2022-05-19 Thread Zain Haider Nemati
Hey All,
How can I check logs for my job when it is running in application mode via
yarn


Re: HTTP REST API as Ingress/Egress

2022-05-19 Thread Austin Cawley-Edwards
Hi Himanshu,

Unfortunately, this is not supported by Statefun, though this type of
application should not be too difficult to using something like the Kafka
Request/Reply pattern[1], and putting that in front of a Statefun cluster.

Best,
Austin

[1]:
https://dzone.com/articles/synchronous-kafka-using-spring-request-reply-1


On Thu, May 19, 2022 at 5:35 AM Himanshu Sareen 
wrote:

> Hi All,
>
>
> It will be of great help if someone can share views.
>
> As per application design. Synchronous access to a stateful fucntion.
>
>1. Application will access/invoke a stateful function via a HTTP call.
>2. Application will wait for an response.
>3. Once Stateful function completes the execution return the response
>back to the Application.
>
>
> Regards
> Himanshu
> --
> *From:* Himanshu Sareen
> *Sent:* Sunday, April 24, 2022 6:59 AM
> *To:* user@flink.apache.org 
> *Subject:* HTTP REST API as Ingress/Egress
>
> Team,
>
> Does flink-statefun support HTTP REST as Ingress ( like Kafka and kinesis )
>
> I'm looking for a fault tolerant solution where an external API can invoke
> stateful function , access state and return response.
>
> We are using python sdk for statefun application
>
> Regards,
> Himanshu
>
>


Re: Confusing S3 Entropy Injection Behavior

2022-05-19 Thread David Anderson
Aeden,

I want to expand my answer after having re-read your question a bit more
carefully.

For point 1 the behavior you are seeing is what is expected. With hadoop
the metadata written by the job manager will literally include "_entropy_"
in its path, while this will be replaced in paths of any and all checkpoint
data files. With presto the metadata path won't include "_entropy_" at all
(it will disappear, rather than being replaced by something specific).

For point 2, I'm not sure.

David

On Thu, May 19, 2022 at 2:37 PM David Anderson  wrote:

> This sounds like it could be FLINK-17359 [1]. What version of Flink are
> you using?
>
> Another likely explanation arises from the fact that only the
> checkpoint data files (the ones created and written by the task managers)
> will have the _entropy_ replaced. The job manager does not inject entropy
> into the path of the checkpoint metadata, so that it remains at a
> predictable URI. Since Flink only writes keyed state larger than
> state.storage.fs.memory-threshold into the checkpoint data files, and only
> those files have entropy injected into their paths, if all of your state is
> small it will all end up in the metadata file and you don't see any entropy
> injection happening. See the comments on [2] for more on this.
>
> FWIW, I would urge you to use presto instead of hadoop for checkpointing
> on S3. The performance of the hadoop "filesystem" is problematic when it's
> used for checkpointing.
>
> Regards,,
> David
>
> [1] https://issues.apache.org/jira/browse/FLINK-17359
> [2] https://issues.apache.org/jira/browse/FLINK-24878
>
> On Wed, May 18, 2022 at 7:48 PM Aeden Jameson 
> wrote:
>
>> I have checkpoints setup against s3 using the hadoop plugin. (I'll
>> migrate to presto at some point) I've setup entropy injection per the
>> documentation with
>>
>> state.checkpoints.dir: s3://my-bucket/_entropy_/my-job/checkpoints
>> s3.entropy.key: _entropy_
>>
>> I'm seeing some behavior that I don't quite understand.
>>
>> 1. The folder s3://my-bucket/_entropy_/my-job/checkpoints/...
>> literally exists. Meaning that "_entropy_" has not been replaced. At
>> the same time there are also a bunch of folders where "_entropy_" has
>> been replaced. Is that to be expected? If so, would someone elaborate
>> on why this is happening?
>>
>> 2. Should the paths in the checkpoints history tab in the FlinkUI
>> display the path the key? With the current setup it is not.
>>
>> Thanks,
>> Aeden
>>
>> GitHub: https://github.com/aedenj
>> Linked In: http://www.linkedin.com/in/aedenjameson
>>
>


Re: How to KafkaConsume from Particular Partition in Flink(version 1.14.4)

2022-05-19 Thread Weihua Hu
Hi Harshit,
FlinkKafkaConsumer does not support consuming a particular partition of a topic.

Best,
Weihua

> 2022年5月18日 下午5:02,harshit.varsh...@iktara.ai  写道:
> 
> particular



Introducing Statefun Tsukuyomi

2022-05-19 Thread Tymur Yarosh
Hello guys,

I've created a library to help narrow integration testing of Stateful Functions 
applications written in Java. It utilizes Stateful Functions' RequestReply 
protocol and provides Java DSL to test the function. Statefun Tsukuyomi sets up 
the function under test with the initial state, interacts, and validates 
outgoing messages and the state after invocation. Ex:

@Test
@Timeout(30)
void verifiesThatTheFunctionSendsMessagesInOrderTheyExpected() {
  // Define your envelopes (envelopes describe messages)
  Envelope envelope = incomingEnvelope();
  Envelope expectedToFunction = outgoingEnvelopeToFunction();
  Envelope expectedToEgress = outgoingEnvelopeToEgress();
  Envelope expectedToSelf = outgoingEnvelopeToSelf();
  Envelope expectedToFunctionDelayed = delayedEnvelopeToFunction();
  // Define function under test and its initial state
  GivenFunction testee = given(
      function(Testee.TYPE, new Testee()),
      withState(Testee.FOO, StateValue.empty()),
      withState(Testee.BAR, StateValue.havingValue(BAR))
  );

  // When function under test receives that envelope
  when(
      testee,
      receives(envelope)
  ).then(
      // Then expect it sends the following messages
      sendsInOrder(expectedToFunction),
      sendsInOrder(expectedToSelf),
      sendsInOrder(expectedToEgress),
      sendsInOrder(expectedToFunctionDelayed),
      // and has the following state value after invocation
      state(Testee.FOO, is("foo")) // Hamcrest matchers supported
  );
}

It runs Flink inside a Docker container and a function under test in Undertow. 
Undertow accepts Flink calls and forwards messages to the function as an actual 
remote module.

Since this approach utilizes actual Flink, it enables testing of:

• Serialization and deserialization of function's state
• Serialization and deserialization of incoming and outgoing messages
• Exchanging real state and messages with real Flink instance


I've built Statefun Tsukuyomi to help our internal development, but it's pretty 
generic and might be helpful for some of you. Please let me know if you find it 
useful.

Pay attention that Docker is required.
Kudos to the Flink team for the inspiration. Their approach to testing Stateful 
Functions inspired me to build this library.
Check out the project page for more information: 
https://github.com/4insyde/statefun-tsukuyomi

Best,
Tymur Yarosh


Applying backpressure to limit state memory consumption

2022-05-19 Thread Robin Cassan
Hey all!
I have a conceptual question on the DataStream API: when using an in-memory
state backend (like the HashMapStateBackend), how can you ensure that the
hashmap won't grow uncontrollably until OutOfMemory happens?

In my case, I would be consuming from a Kafka topic, into a SessionWindow.
The HashMap state would be accumulating data in memory until the timeout
expires, with a PurgingTrigger to clean up the state.
The cluster's memory would be sized to handle a normal load, but in case of
lag or spikes we want the Flink job to slow down its consumption of the
kafka topic so that the window's state stays capped at a given size (could
be the number of keys or the total Gb). We have tested this scenario, and
Flink would consume really quickly from Kafka until memory was so full that
it was stuck in GC loops, unable to make progress on the ProcessFunction
applied after the window.

Is there any setting to limit the size of a Window state? Maybe there are
some bounded buffers between operators that can be adjusted?

Thanks a lot for your help!
Robin


Re: Incorrect checkpoint id used when job is recovering

2022-05-19 Thread tao xiao
Hi team,

Can anyone shed some light?

On Sat, May 14, 2022 at 8:56 AM tao xiao  wrote:

> Hi team,
>
> Does anyone have any ideas?
>
> On Thu, May 12, 2022 at 9:20 PM tao xiao  wrote:
>
>> Forgot to mention the Flink version is 1.13.2 and we use kubernetes
>> native mode
>>
>> On Thu, May 12, 2022 at 9:18 PM tao xiao  wrote:
>>
>>> Hi team,
>>>
>>> I met a weird issue when a job tries to recover from JM failure.  The
>>> success checkpoint before JM crashed is 41205
>>>
>>> ```
>>>
>>> {"log":"2022-05-10 14:55:40,663 INFO  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
>>> checkpoint 41205 for job  (9453840 bytes in 
>>> 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"}
>>>
>>> ```
>>>
>>> However JM tries to recover the job with an old checkpoint 41051 which
>>> doesn't exist that leads to unrecoverable state
>>>
>>> ```
>>>
>>> "2022-05-10 14:59:38,949 INFO  
>>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
>>> Trying to retrieve checkpoint 41051.\n"
>>>
>>> ```
>>>
>>> Full log attached
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao


Re: s3p 如果在本地调试

2022-05-19 Thread Weihua Hu
Hi,
你是在 IDEA 中运行吗?我增加相关的 pom 依赖后在 wordcount 中可以正常运行,可以 idea maven reload project 试试

Best,
Weihua

> 2022年5月19日 下午4:05,z y xing  写道:
> 
> 各位好:
> 了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了?
> 
> flink版本 1.14,win10
> 项目通过flink-quick-start创建,在pom中添加了如下依赖
> 
> 
>   org.apache.flink
>   flink-s3-fs-presto
>   ${flink.version}
> 
> 
> 初始代码类似如下:
> 
> Configuration fileSystemConf = new Configuration();
> 
> fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false);
> fileSystemConf.setString("presto.s3.access-key", "minioadmin");
> fileSystemConf.setString("presto.s3.secret-key", "minioadmin");
> fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000;);
> 
> FileSystem.initialize(fileSystemConf);
> 
> Path path = new Path("s3p://test/");
> System.out.println(path.getFileSystem().exists(path));
> 
> 但是会抛出如下异常:
> Exception in thread "main"
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 's3p'. The scheme is directly
> supported by Flink through the following plugin: flink-s3-fs-presto. Please
> ensure that each plugin resides within its own subfolder within the plugins
> directory. See
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
> more information. If you want to use a Hadoop file system for that scheme,
> please add the scheme to the configuration fs.allowed-fallback-filesystems.
> For a full list of supported file systems, please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
> at org.example.StreamingJob.main(StreamingJob.java:58)
> 
> 但是神奇的是,我可以用s3a
> 初始化配置如下:
> 
> fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false);
> fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000;);
> fileSystemConf.setString("fs.s3a.access.key", "minioadmin");
> fileSystemConf.setString("fs.s3a.secret.key", "minioadmin");
> fileSystemConf.setString("fs.s3a.path.style.access", "true");
> fileSystemConf.setString("fs.s3a.impl",
> "org.apache.hadoop.fs.s3a.S3AFileSystem");
> 
> 
> 谢谢!



Re:flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-19 Thread Xuyang
Hi,请问在UI界面这些数据都是空的吗?可以贴一下具体的代码和UI界面截图吗?会不会是由于算子chain在一起了导致输入/输出数据是0呢?




--

Best!
Xuyang





在 2022-05-19 17:52:20,"yidan zhao"  写道:
>如题,主要表现是web 
>ui部分监控,比如watermark,每个节点的数据之类不展示。看chrome的network视图可以发现请求返回状态码都是200,但是数据是空的。
>
>以watermarks请求为例:
>.../jobs/f5600ae6822108629f26b492ed7a1f96/vertices/95d07d760a7164acf2417ef057dca790/watermarks
>结果是 [] 。 不清楚啥情况,有人清楚吗。
>
>目前看了下jobmanger日志,有部分报错,但和这些请求没关系都。报的都是org.apache.flink.runtime.checkpoint.CheckpointsCleaner
>  - Could not properly discard completed checkpoint 32748 这种错误。


Re: Confusing S3 Entropy Injection Behavior

2022-05-19 Thread David Anderson
This sounds like it could be FLINK-17359 [1]. What version of Flink are you
using?

Another likely explanation arises from the fact that only the
checkpoint data files (the ones created and written by the task managers)
will have the _entropy_ replaced. The job manager does not inject entropy
into the path of the checkpoint metadata, so that it remains at a
predictable URI. Since Flink only writes keyed state larger than
state.storage.fs.memory-threshold into the checkpoint data files, and only
those files have entropy injected into their paths, if all of your state is
small it will all end up in the metadata file and you don't see any entropy
injection happening. See the comments on [2] for more on this.

FWIW, I would urge you to use presto instead of hadoop for checkpointing on
S3. The performance of the hadoop "filesystem" is problematic when it's
used for checkpointing.

Regards,,
David

[1] https://issues.apache.org/jira/browse/FLINK-17359
[2] https://issues.apache.org/jira/browse/FLINK-24878

On Wed, May 18, 2022 at 7:48 PM Aeden Jameson 
wrote:

> I have checkpoints setup against s3 using the hadoop plugin. (I'll
> migrate to presto at some point) I've setup entropy injection per the
> documentation with
>
> state.checkpoints.dir: s3://my-bucket/_entropy_/my-job/checkpoints
> s3.entropy.key: _entropy_
>
> I'm seeing some behavior that I don't quite understand.
>
> 1. The folder s3://my-bucket/_entropy_/my-job/checkpoints/...
> literally exists. Meaning that "_entropy_" has not been replaced. At
> the same time there are also a bunch of folders where "_entropy_" has
> been replaced. Is that to be expected? If so, would someone elaborate
> on why this is happening?
>
> 2. Should the paths in the checkpoints history tab in the FlinkUI
> display the path the key? With the current setup it is not.
>
> Thanks,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
>


Re:Window aggregation fails after upgrading to Flink 1.15

2022-05-19 Thread Xuyang
Hi, can you provide the DDL of your source table?
I test your query in my idea and it works. Here is my code.


createtemporarytable myTable(
userid int,
pageid int,
p_userid string,
rowtime asproctime()
) with (
'connector'='datagen'
);



SELECT window_start, window_end, userid, count(pageid) AS pgcnt 
FROMTABLE(TUMBLE(TABLE myTable, DESCRIPTOR(rowtime), INTERVAL'5'SECONDS)) WHERE 
(p_userid <>'User_6') GROUPBY window_start, window_end, userid







--

Best!
Xuyang




At 2022-05-19 08:53:03, "Pouria Pirzadeh"  wrote:

I am running a Flink application in Java that performs window aggregation.
The query runs successfully on Flink 1.14.4. However, after upgrading to Flink 
1.15.0 and switching the code to use Windowing TVF, it fails with a runtime 
error as planner can not compile and instantiate window Aggs Handler code.
It seems the generated table program code is invalid and can not be compiled by 
janino's SimpleCompiler to get executable runtime code.


Is there any specific change required to go from Flink 1.14.4 to 1.15.0, beside 
upgrading maven dependencies? 


Here is the query:


tableEnv.createTemporaryView("myTable", table);
String sql = " SELECT window_start, window_end, userid, count(pageid) AS pgcnt 
FROM TABLE(TUMBLE(TABLE myTable, DESCRIPTOR(rowtime), INTERVAL '5' SECONDS)) 
WHERE (p_userid <> 'User_6') GROUP BY window_start, window_end, userid";
final Table result = tableEnv.sqlQuery(sql);
result.execute().print();



Here is the error stack trace:


Causedby: java.lang.RuntimeException: Couldnot instantiate generated 
class'LocalWindowAggsHandler$34'
at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
at 
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggCombiner$Factory.createRecordsCombiner(LocalAggCombiner.java:127)
at 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer$LocalFactory.create(RecordsWindowBuffer.java:204)
at 
org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWindowAggOperator.open(LocalSlicingWindowAggOperator.java:101)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(Thread.java:834)
Causedby: org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101)
at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
... 13 more
Causedby: 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
... 15 more
Causedby: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
at 
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)

Re: Flink application on yarn cluster - main method not found

2022-05-19 Thread Weihua Hu
Hi,

Which version of flink are you using?
It looks like there is a conflict between the flink version of the cluster and 
the version in userjar

Best,
Weihua

> 2022年5月19日 下午4:49,Zain Haider Nemati  写道:
> 
> Hi,
> Im running flink application on yarn cluster it is giving me this error, it 
> is working fine on standalone cluster. Any idea what could be causing this?
> 
> Exception in thread "main" java.lang.NoSuchMethodError: 
> org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder([Ljava/lang/String;)Lorg/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever$Builder;
> at 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgramRetriever(YarnApplicationClusterEntryPoint.java:137)
> at 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgram(YarnApplicationClusterEntryPoint.java:121)
> at 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.main(YarnApplicationClusterEntryPoint.java:95)



Re: Flink application on yarn cluster - main method not found

2022-05-19 Thread Zain Haider Nemati
Hi Folks,
Would appreciate it if someone could help me out with this !

Cheers

On Thu, May 19, 2022 at 1:49 PM Zain Haider Nemati 
wrote:

> Hi,
> Im running flink application on yarn cluster it is giving me this error,
> it is working fine on standalone cluster. Any idea what could be causing
> this?
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder([Ljava/lang/String;)Lorg/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever$Builder;
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgramRetriever(YarnApplicationClusterEntryPoint.java:137)
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgram(YarnApplicationClusterEntryPoint.java:121)
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.main(YarnApplicationClusterEntryPoint.java:95)
>


flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-19 Thread yidan zhao
如题,主要表现是web 
ui部分监控,比如watermark,每个节点的数据之类不展示。看chrome的network视图可以发现请求返回状态码都是200,但是数据是空的。

以watermarks请求为例:
.../jobs/f5600ae6822108629f26b492ed7a1f96/vertices/95d07d760a7164acf2417ef057dca790/watermarks
结果是 [] 。 不清楚啥情况,有人清楚吗。

目前看了下jobmanger日志,有部分报错,但和这些请求没关系都。报的都是org.apache.flink.runtime.checkpoint.CheckpointsCleaner
  - Could not properly discard completed checkpoint 32748 这种错误。


Final reminder: ApacheCon North America call for presentations closing soon

2022-05-19 Thread Rich Bowen
[Note: You're receiving this because you are subscribed to one or more
Apache Software Foundation project mailing lists.]

This is your final reminder that the Call for Presetations for
ApacheCon North America 2022 will close at 00:01 GMT on Monday, May
23rd, 2022. Please don't wait! Get your talk proposals in now!

Details here: https://apachecon.com/acna2022/cfp.html

--Rich, for the ApacheCon Planners




Re: HTTP REST API as Ingress/Egress

2022-05-19 Thread Himanshu Sareen
Hi All,


It will be of great help if someone can share views.

As per application design. Synchronous access to a stateful fucntion.

  1.  Application will access/invoke a stateful function via a HTTP call.
  2.  Application will wait for an response.
  3.  Once Stateful function completes the execution return the response back 
to the Application.

Regards
Himanshu

From: Himanshu Sareen
Sent: Sunday, April 24, 2022 6:59 AM
To: user@flink.apache.org 
Subject: HTTP REST API as Ingress/Egress

Team,

Does flink-statefun support HTTP REST as Ingress ( like Kafka and kinesis )

I'm looking for a fault tolerant solution where an external API can invoke 
stateful function , access state and return response.

We are using python sdk for statefun application

Regards,
Himanshu



Flink Stateful Function - Regex Match in State Key

2022-05-19 Thread Himanshu Sareen
Hi All,

My understanding is Flink uses exact match on key to fetch/load state in a 
stateful functions.

But is it possible to use a regex expression in target-id or as a key to a 
stateful function, thus fetching/loading all matching states.

Regards,
Himanshu


Flink application on yarn cluster - main method not found

2022-05-19 Thread Zain Haider Nemati
Hi,
Im running flink application on yarn cluster it is giving me this error, it
is working fine on standalone cluster. Any idea what could be causing this?

Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder([Ljava/lang/String;)Lorg/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever$Builder;
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgramRetriever(YarnApplicationClusterEntryPoint.java:137)
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgram(YarnApplicationClusterEntryPoint.java:121)
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.main(YarnApplicationClusterEntryPoint.java:95)


s3p 如果在本地调试

2022-05-19 Thread z y xing
各位好:
了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了?

flink版本 1.14,win10
项目通过flink-quick-start创建,在pom中添加了如下依赖


   org.apache.flink
   flink-s3-fs-presto
   ${flink.version}


初始代码类似如下:

Configuration fileSystemConf = new Configuration();

fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false);
fileSystemConf.setString("presto.s3.access-key", "minioadmin");
fileSystemConf.setString("presto.s3.secret-key", "minioadmin");
fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000;);

FileSystem.initialize(fileSystemConf);

Path path = new Path("s3p://test/");
System.out.println(path.getFileSystem().exists(path));

但是会抛出如下异常:
Exception in thread "main"
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 's3p'. The scheme is directly
supported by Flink through the following plugin: flink-s3-fs-presto. Please
ensure that each plugin resides within its own subfolder within the plugins
directory. See
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
more information. If you want to use a Hadoop file system for that scheme,
please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at org.example.StreamingJob.main(StreamingJob.java:58)

但是神奇的是,我可以用s3a
初始化配置如下:

fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false);
fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000;);
fileSystemConf.setString("fs.s3a.access.key", "minioadmin");
fileSystemConf.setString("fs.s3a.secret.key", "minioadmin");
fileSystemConf.setString("fs.s3a.path.style.access", "true");
fileSystemConf.setString("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");


谢谢!


RE: Checkpoint directories not cleared as TaskManagers run

2022-05-19 Thread Schwalbe Matthias
Hi James,

Let me give some short answers, (there is documentation that better describes 
this):

>> - why do taskmanagers create the chk-x directory but only the jobmanager can 
>> delete it? Shouldn’t the jobmanager be the only component creating and 
>> deleting these directories? That would seem more consistent to me but maybe 
>> there is a reason.

  *   Assuming proper setup, i.e. checkpoint directory is on a shared folder
  *   Tasks and state thereof are split as subtasks to separate slots 
(according to parallelism)
  *   When checkpoints are written each state primitive on each resp. subtask 
writes its portion of state to the checkpoint folder and forwards the filename 
to the job manager
  *   For incremental checkpoints some files also remain in older checkpoint 
folders until obsolete
  *   This process is managed by jobmanager
  *   In the end of each checkpoint, jobmanager writes _metadata file to the 
resp. checkpoint folder containing (simplified) the filenames of respective 
states and small state
  *   When a new checkpoint is finished, jobmanager decides according to 
configuration which old checkpoint files become obsolete and hence deleted
  *   In general checkpoints and savepoints are for high availability purposes, 
if the checkpoint data were on a local folder of machine that crashed it would 
not be available for restart of the job
  *   The parts that should be on a local (and fast) drive are the ones used by 
RocksDB, these are ephermeral and can (and will) be recreated on job recovery
>>  - I see many files under each chk-x folder. Can anyone confirm if each file 
>> is wholly owned by a single task manager? ie is each file only written by 1 
>> TM? Otherwise there could be file locking and contention.

  *   Mostly explained above … however
  *   If two taskmanagers happen to be started on the same machine (uncommon 
for k8s, common for Yarn resource manager) they would use the same folder
  *   Filenames contain a uuid which is unlikely to collide
>> - we are now looking to add in NFS mounts for our containers so all the job 
>> managers and taskmanagers share the same path. Can anyone confirm if NFS is 
>> a ‘reliable’ storage mechanism as we have heard many stories how problematic 
>> it can be. We are not yet able to use HDFS or S3.

  *   NFS is not reliable, probably not fit for PROD purposes, don’t know about 
some NAS setup that uses NFS and has integrated reliability …
>> - if Flink can not write to NFS my understanding is although the checkpoint 
>> will fail the Flink process will carry on and try again at the next 
>> checkpoint. It will not cause my program to fail correct?

  *   Imho there would be no reason to setup checkpointing in the first place, 
if you cannot restart a job from such checkpoint
  *   This is only important, of course, if you need reliability, or exactly 
once semantics …

Thias

From: James Sandys-Lumsdaine 
Sent: Wednesday, May 18, 2022 2:53 PM
To: Schwalbe Matthias 
Cc: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hello Matthias,

Thanks for your reply. Yes indeed your are correct. My /tmp path is private so 
you have confirmed what I thought was happening.

I have some follow up questions:
- why do taskmanagers create the chk-x directory but only the jobmanager can 
delete it? Shouldn’t the jobmanager be the only component creating and deleting 
these directories? That would seem more consistent to me but maybe there is a 
reason.
- I see many files under each chk-x folder. Can anyone confirm if each file is 
wholly owned by a single task manager? ie is each file only written by 1 TM? 
Otherwise there could be file locking and contention.
- we are now looking to add in NFS mounts for our containers so all the job 
managers and taskmanagers share the same path. Can anyone confirm if NFS is a 
‘reliable’ storage mechanism as we have heard many stories how problematic it 
can be. We are not yet able to use HDFS or S3.
- if Flink can not write to NFS my understanding is although the checkpoint 
will fail the Flink process will carry on and try again at the next checkpoint. 
It will not cause my program to fail correct?

Many thanks again,

James.

Sent from my iPhone


On 17 May 2022, at 15:17, Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:

Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Yu mailto:master...@gmail.com>>; 
user@flink.apache.org
Subject: Re: 

Re: Kafka Sink Key and Value Avro Schema Usage Issues

2022-05-19 Thread Peter Schrott
Hi Ghiy,

I am not quite sure about your actual problem, why the schema is not
generated as expected.

I also needed to work with the Kafka keys in the business logic, therefore
I found a way to deserialize and serialize the key along with the event
itself by overriding KafkaRecord[De]SerializationSchema. I am using Flinks'
new Kafka Source / Sink API. The difference to your requirement is that I
am only using keys from type Array[Byte]. But this could most certainly be
patched.


class KeyedEventSerializationSchema[A <: SpecificRecord:
ClassTag](topic: String, schemaRegistryUrl: String)
extends KafkaRecordSerializationSchema[KeyedEvent[A]]
with KafkaContextAware[KeyedEvent[A]] {

  private val eventClass = classTag[A].runtimeClass.asInstanceOf[Class[A]]
  private val valueSchema =
ConfluentRegistryAvroSerializationSchema.forSpecific(eventClass,
eventClass.getCanonicalName, schemaRegistryUrl)
  // TODO maybe you could add the keySchema here accordingly...

  override def serialize(element: KeyedEvent[A], context:
KafkaSinkContext, timestamp: JLong): ProducerRecord[Array[Byte],
Array[Byte]] =
new ProducerRecord(getTargetTopic(element), element.key,
valueSchema.serialize(element.value))

  override def getTargetTopic(element: KeyedEvent[A]): String = topic
}

final case class KeyedEvent[A <: SpecificRecord](key: Array[Byte], value: A)

val keyedEventSerialization = new
KeyedEventSerializationSchema[A](OutputTopicName, SchemaRegistryUrl)
val kafkaSinkBuilder = KafkaSink.builder[KeyedEvent[A]]()
kafkaSinkBuilder
  .setBootstrapServers(BootstrapServers)
  .setKafkaProducerConfig(ProducerProperties)
  .setRecordSerializer(keyedEventSerialization)
  .build()

Maybe this helps.

Best Peter

On Wed, May 18, 2022 at 7:03 PM Ghiya, Jay (GE Healthcare) 
wrote:

> Also forgot to attach the information regarding how did I convert the avro
> objects to bytes in the approach that I took with deprecated kafka producer.
>
>
>
> protected byte[] getValueBytes(Value value)
>
> {
>
> DatumWriter valWriter = new SpecificDatumWriter(
>
> Value.getSchema());
>
> ByteArrayOutputStream valOut = new ByteArrayOutputStream();
>
> BinaryEncoder valEncoder =
> EncoderFactory.get().binaryEncoder(valOut, null);
>
>
>
> try {
>
> valWriter.write(value, valEncoder);
>
>
>
> // TODO Auto-generated catch block
>
>
>
> valEncoder.flush();
>
>
>
> // TODO Auto-generated catch block
>
>
>
> valOut.close();
>
>
>
> // TODO Auto-generated catch block
>
>
>
> } catch (Exception e) {
>
>
>
> }
>
>
>
> return valOut.toByteArray();
>
> }
>
>
>
> protected byte[] getKeyBytes(Key key) {
>
>
>
> DatumWriter keyWriter = new SpecificDatumWriter(
>
> key.getSchema());
>
> ByteArrayOutputStream keyOut = new ByteArrayOutputStream();
>
> BinaryEncoder keyEncoder =
> EncoderFactory.get().binaryEncoder(keyOut, null);
>
>
>
> try {
>
> keyWriter.write(key, keyEncoder);
>
>
>
> // TODO Auto-generated catch block
>
>
>
> keyEncoder.flush();
>
>
>
> // TODO Auto-generated catch block
>
>
>
> keyOut.close();
>
>
>
> // TODO Auto-generated catch block
>
>
>
> } catch (Exception e) {
>
>
>
> }
>
>
>
> return keyOut.toByteArray();
>
> }
>
>
>
>
>
>
>
> *From:* Ghiya, Jay (GE Healthcare)
> *Sent:* 18 May 2022 21:51
> *To:* user@flink.apache.org
> *Cc:* d...@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) <
> satheeshkumar.pandia...@ge.com>; Kumar, Vipin (GE Healthcare) <
> vipin.s.ku...@ge.com>
> *Subject:* Kafka Sink Key and Value Avro Schema Usage Issues
>
>
>
> Hi Team,
>
>
>
> This is regarding Flink Kafka Sink. We have a usecase where we have
> headers and both key and value as Avro Schema.
>
>
>
> Below is the expectation in terms of intuitiveness for avro kafka key and
> value:
>
>
>
> KafkaSink.>builder()
>
> .setBootstrapServers(cloudkafkaBrokerAPI)
>
> .setRecordSerializer(
>
> KafkaRecordSerializationSchema.builder()
>
> .setKeySerializationSchema(
>
>
> ConfluentRegistryAvroSerializationSchema
>
> .forSpecific(
>
> key.class,
>
> "Key",
>
> cloudSchemaRegistryURL))
>
> .setValueSerializationSchema(
>
>
>  ConfluentRegistryAvroSerializationSchema
>
> .forSpecific(
>
>
> Value.class,"val", cloudSchemaRegistryURL))
>
> .setTopic(outputTopic)
>
> .build())
>
> .build();
>
>
>
> What I 

Re: Flink Kubernetes operator not having a scale subresource

2022-05-19 Thread Gyula Fóra
Hi Team!

This is probably something for after the release but I created a simple
prototype for the scaling subresource based on taskmanager replica count.

You can take a look here:
https://github.com/apache/flink-kubernetes-operator/pull/227

After some consideration I decided against using parallelism and used tm
replicas instead (still with native integration), I describe this in the PR.

I will leave the PR open so people can experiment/comment and we should
definitely get back to this after the 1.0.0 release because it seems to be
a very lightweight yet useful feature.

Cheers,
Gyula


On Sat, May 7, 2022 at 11:25 AM Gyula Fóra  wrote:

> Hi Jay!
>
> I will take a closer look into this and see if we can use the parallelism
> in the scale subresource.
>
> If you could experiment with this and see if it works with the current CRD
> that would be helpful . Not sure if we need to change the status or
> anything as parallelism is only part of the spec at the moment.
>
> If you have a working modified CRD I would appreciate if you could share
> it with us!
>
> Don’t worry about the release schedule, if we think that this is important
> and we need some changes for it , we can push the release out a few days if
> necessary.
>
> What is important at this point to understand what exactly we need to make
> the parallelism scaling work natively to avoid breaking changes to the
> spec/status after the release :)
>
> Cheers
> Gyula
>
> On Sat, 7 May 2022 at 11:14, Jay Ghiya  wrote:
>
>> Hi Team,
>>
>> Yes we can change the parallelism of flink job. So going through the
>> roadmap , what I understand that we have put the standalone mode as second
>> priority due to right reasons. So , if possible can I be of any help to
>> accelerate this as we have a tight release schedule so would want to close
>> this in next 10 days with your guys’ help.
>>
>> Looking forward to hear from you !
>>
>> -Jay
>>
>> Sent with a Spark 
>> On 7 May 2022, 8:15 AM +0530, Yang Wang , wrote:
>>
>> Currently, the flink-kubernetes-operator is using Flink native K8s
>> integration[1], which means Flink ResourceManager will dynamically allocate
>> TaskManager on demand.
>> So the users do not need to specify the replicas of TaskManager.
>>
>> Just like Gyula said, one possible solution to make "kubectl scale" work
>> is to change the parallelism of Flink job.
>>
>> If the standalone mode[2] is introduced in the operator, then it is also
>> possible to directly change the replicas of TaskManager pods.
>>
>>
>> [1].
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/
>> [2].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator
>>
>> Best,
>> Yang
>>
>> Gyula Fóra  于2022年5月7日周六 04:26写道:
>>
>>> Hi Jay!
>>>
>>> Interesting question/proposal to add the scale-subresource.
>>>
>>> I am not an expert on this area but we will look into this a little and
>>> give you some feedback and see if we can incorporate something into the
>>> upcoming release if it makes sense.
>>>
>>> On a high level there is not a single replicas value for a
>>> FlinkDeployment that would be easy to map, but maybe we could use the
>>> parallelism value for this purpose for Applications/Session jobs.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Fri, May 6, 2022 at 8:04 PM Jay Ghiya  wrote:
>>>
  Hi Team,


 I have been experimenting the Flink Kubernetes operator. One of the
 biggest miss that we have is it does not support scale sub resource as of
 now to support reactive scaling. Without that commercially it becomes very
 difficult for products like us who have very varied loads for every hour.



 Can I get some direction on the same to contribute on
 https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource
  for
 our Kubernetes operator crd?

 I have been a hard time reading -> 
 *https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
 
  to
 figure out the replicas, status,label selector json path of task
 manager? It may be due to lack of my knowledge so sense of direction will
 help me.*

 *-Jay*
 *GEHC*

>>>