flink web ui job overview 无法显示算子接收到的数据条数

2021-11-29 Thread su wenwen



Hello,all
为啥在 pipeline.operator-chaining 设置为true 的时候 flink web ui job overview 
无法显示算子接收,发送到的数据条数和字节大小?
图片在附件

[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
zhiyuan.franklin
zhiyuan.frank...@outlook.com
签名由 网易邮箱大师 定制



Parquet schema per bucket in Streaming File Sink

2021-11-29 Thread Zack Loebel
Hey all,

I have a job which writes data that is a similar shape to a location in s3.
Currently it writes a map of data with each row. I want to customize this
job to "explode" the map as column names and values, these are consistent
for a single bucket. Is there any way to do this? Provide a custom parquet
schema per bucket within a single dynamic sink?

I've started looking at the changes within the main codebase to make this
feasible. It seems straightforward to provide the bucketId to the
writerFactory, and the bucketId could be a type containing the relevant
schema information.
Although it appears that the BulkFormatBuilder has several spots where
BucketId appears to be required to be a String: specifically
the BucketAssigner and the CheckpointRollingPolicy both appear to be
required to have a bucketId of a String.

I'm curious if this is a change the community would be open to, and or if
there is another way to accomplish what I'm looking for that I've missed.

Thanks,
Zack


Re: Flink-13 table API wrongly adding padding for IN clause elements

2021-11-29 Thread Caizhi Weng
Hi!

Thanks for raising this issue which is a known issue [1]. Currently, I
would recommend creating a UDF as a workaround.

[1] https://issues.apache.org/jira/browse/FLINK-24708

Ayush Chauhan  于2021年11月30日周二 下午12:15写道:

> Hi,
>
> In Flink 13, while using filter/where condition in table api I am getting
> wrong results. Upon debugging I found that it is adding padding to the IN
> clause elements according to the first element in the IN clause.
>
> Here's the sample code
>
> tEnv.toAppendStream(input.where($("ename").in("O2CartPageLoaded","OrderPlaced","O2MenuViewed","opened_app"))
> .select($("ename")), Utils.getTypeInfo("ename:string")).print();
>
> Screenshot of hashset
> [image: Screenshot 2021-11-29 at 16.24.59.png]
>
> If I change the IN clasue element order, then the padding also changes
>
> tEnv.toAppendStream(input.where($("ename").in("OrderPlaced","O2CartPageLoaded","O2MenuViewed","opened_app"))
> .select($("ename")), Utils.getTypeInfo("ename:string")).print();
>
> Screenshot of hashset
> [image: Screenshot 2021-11-29 at 16.23.20.png]
>
> --
>  Ayush Chauhan
>  Data Platform
>
>
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Flink-13 table API wrongly adding padding for IN clause elements

2021-11-29 Thread Ayush Chauhan
Hi,

In Flink 13, while using filter/where condition in table api I am getting
wrong results. Upon debugging I found that it is adding padding to the IN
clause elements according to the first element in the IN clause.

Here's the sample code

tEnv.toAppendStream(input.where($("ename").in("O2CartPageLoaded","OrderPlaced","O2MenuViewed","opened_app"))
.select($("ename")), Utils.getTypeInfo("ename:string")).print();

Screenshot of hashset
[image: Screenshot 2021-11-29 at 16.24.59.png]

If I change the IN clasue element order, then the padding also changes

tEnv.toAppendStream(input.where($("ename").in("OrderPlaced","O2CartPageLoaded","O2MenuViewed","opened_app"))
.select($("ename")), Utils.getTypeInfo("ename:string")).print();

Screenshot of hashset
[image: Screenshot 2021-11-29 at 16.23.20.png]

--
 Ayush Chauhan
 Data Platform

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



flink on k8s 使用OSS作为backend遇到的业务数据丢失问题

2021-11-29 Thread 赵旭晨
现在使用阿里云oss作为状态存储后端。发现业务数据无法在checkpoint中回放,重点代码如下:
main:
sink 算子:


checkpoint时将业务数据存入oss


重启后应该从最近checkpoint中将数据回放
以上代码在直接跑在虚拟机上的作业(状态存储是本地硬盘)是没问题的,业务数据能够从检查点中回放的


但是一旦使用k8s-oss体系,
检查点业务数据存入没问题


但我将作业重启后


由于context.isRestored()是false,所以无法将重启前作业的业务数据回放,导致数据丢失。
还请各位大佬帮忙看看,谢谢谢~~~







Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Hang Ruan
Hi,

Maybe you can write like this :
builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
"true");

Other additional properties could be found here :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties

Marco Villalobos  于2021年11月30日周二 上午11:08写道:

> Thank you for the information.  That still does not answer my question
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
> that consumer should commit offsets back to Kafka on checkpoints?
>
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.
>
> But now that I am using KafkaSourceBuilder, how do I configure that
> behavior so that offsets get committed on checkpoints?  Or is that the
> default behavior with checkpoints?
>
> -Marco
>
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Flink 1.14 release note states about this. See [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>
>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>
>>> Hi everybody,
>>>
>>> I am using Flink 1.12 and migrating my code from using
>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>
>>> FlinkKafkaConsumer has the method
>>>
>>> /**
  * Specifies whether or not the consumer should commit offsets back to
 Kafka on checkpoints.
  * This setting will only have effect if checkpointing is enabled for
 the job. If checkpointing isn't
  * enabled, only the "auto.commit.enable" (for 0.8) /
 "enable.auto.commit" (for 0.9+) property
  * settings will be used.
 */
 FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>
>>>
>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>> already have checkpointing configured, is it necessary to setup "commit
>>> offsets on checkpoints"?
>>>
>>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>>> 1.14 documentation says little about it.
>>>
>>>  For example, the Flink 1.14 documentation states:
>>>
>>> Additional Properties
 In addition to properties described above, you can set arbitrary
 properties for KafkaSource and KafkaConsumer by using
 setProperties(Properties) and setProperty(String, String). KafkaSource has
 following options for configuration:
 commit.offsets.on.checkpoint specifies whether to commit consuming
 offsets to Kafka brokers on checkpoint
>>>
>>>
>>> And the 1.12 documentation states:
>>>
>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
 consume records from a topic and periodically checkpoint all its Kafka
 offsets, together with the state of other operations. In case of a job
 failure, Flink will restore the streaming program to the state of the
 latest checkpoint and re-consume the records from Kafka, starting from the
 offsets that were stored in the checkpoint.
 The interval of drawing checkpoints therefore defines how much the
 program may have to go back at most, in case of a failure. To use fault
 tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
 in the job.
 If checkpointing is disabled, the Kafka consumer will periodically
 commit the offsets to Zookeeper.
>>>
>>>
>>> Thank you.
>>>
>>> Marco
>>>
>>>
>>>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Marco Villalobos
Thank you for the information.  That still does not answer my question
though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
that consumer should commit offsets back to Kafka on checkpoints?

FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.

But now that I am using KafkaSourceBuilder, how do I configure that
behavior so that offsets get committed on checkpoints?  Or is that the
default behavior with checkpoints?

-Marco

On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:

> Hi!
>
> Flink 1.14 release note states about this. See [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>
> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>
>> Hi everybody,
>>
>> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
>> to using the KafkaSourceBuilder.
>>
>> FlinkKafkaConsumer has the method
>>
>> /**
>>>  * Specifies whether or not the consumer should commit offsets back to
>>> Kafka on checkpoints.
>>>  * This setting will only have effect if checkpointing is enabled for
>>> the job. If checkpointing isn't
>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>> "enable.auto.commit" (for 0.9+) property
>>>  * settings will be used.
>>> */
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>
>>
>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>> already have checkpointing configured, is it necessary to setup "commit
>> offsets on checkpoints"?
>>
>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>> 1.14 documentation says little about it.
>>
>>  For example, the Flink 1.14 documentation states:
>>
>> Additional Properties
>>> In addition to properties described above, you can set arbitrary
>>> properties for KafkaSource and KafkaConsumer by using
>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>> following options for configuration:
>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>> offsets to Kafka brokers on checkpoint
>>
>>
>> And the 1.12 documentation states:
>>
>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
>>> records from a topic and periodically checkpoint all its Kafka offsets,
>>> together with the state of other operations. In case of a job failure,
>>> Flink will restore the streaming program to the state of the latest
>>> checkpoint and re-consume the records from Kafka, starting from the offsets
>>> that were stored in the checkpoint.
>>> The interval of drawing checkpoints therefore defines how much the
>>> program may have to go back at most, in case of a failure. To use fault
>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>> in the job.
>>> If checkpointing is disabled, the Kafka consumer will periodically
>>> commit the offsets to Zookeeper.
>>
>>
>> Thank you.
>>
>> Marco
>>
>>
>>


Re: Time attribute will be lost after two(or more) table joining

2021-11-29 Thread Caizhi Weng
Hi!

As this mail is written in English I'm also forwarding this to the user
mailing list.

Streaming joins do not retain row time attribute and this is the expected
behavior. As you're windowing the results of joins I guess you're enriching
the records from one stream with that join. Lookup joins [1] and event time
temporal join [2] will retain row time and their results can be used by
windowing operators later. Do they meet your needs?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#lookup-join
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#event-time-temporal-join

Pinjie Huang  于2021年11月29日周一 上午10:44写道:

> Hi Friends,However, we found that the time attribute will be lost after
> table joining, which means that we cannot do the joining and aggregation at
> one SQL query statement. There will be no output after the above SQL
> querying, for SQL queries on streaming tables, the time_attr argument of
> the group window function
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
> >
> must refer to a valid time attribute that specifies the processing time or
> event time of rows. In wide_table,  the time_attr of field
> eventInfo_eventTime has been lost.
>
> Any ideas?
>


Re: Time attribute will be lost after two(or more) table joining

2021-11-29 Thread Caizhi Weng
Hi!

As this mail is written in English I'm also forwarding this to the user
mailing list.

Streaming joins do not retain row time attribute and this is the expected
behavior. As you're windowing the results of joins I guess you're enriching
the records from one stream with that join. Lookup joins [1] and event time
temporal join [2] will retain row time and their results can be used by
windowing operators later. Do they meet your needs?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#lookup-join
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#event-time-temporal-join

Pinjie Huang  于2021年11月29日周一 上午10:44写道:

> Hi Friends,However, we found that the time attribute will be lost after
> table joining, which means that we cannot do the joining and aggregation at
> one SQL query statement. There will be no output after the above SQL
> querying, for SQL queries on streaming tables, the time_attr argument of
> the group window function
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
> >
> must refer to a valid time attribute that specifies the processing time or
> event time of rows. In wide_table,  the time_attr of field
> eventInfo_eventTime has been lost.
>
> Any ideas?
>


Re: flink web UI job overview 中的算子记录条数不更新

2021-11-29 Thread Caizhi Weng
Hi!

图片看不到,也没有附件。建议使用外部图片上传页面。

“算子对应的接收和发送记录数不更新”,这里的“算子”指的是 web UI 上的一个方框吗?如果这个方框没有其他方框给它数据数据(也就是说这是一个
source task)那么它的接收数就是 0;如果这个方框不向其他方框送出数据(也就是说这是一个 sink task)那么它的发送数就是 0。

zhiyuan su  于2021年11月30日周二 上午10:09写道:

> Hello,麻烦问下,为啥算子链 默认合并之后,数据更新,算子对应的接收和发送记录数不更新,一直显示为0.
>
> 图片发送了看不到,请看附件
>
>


Re: REST API for detached minicluster based integration test

2021-11-29 Thread Caizhi Weng
Hi!

I believe metrics are enabled by default even for a mini cluster. Which
Flink version are you using and how do you set your watermark strategy?
Could you share your user code about how to create the datastream / SQL and
get the job graph?

I'm also curious about why do you need to extract the output watermarks
just for stopping the source. You can control the records and the watermark
strategy from the source. From my point of view, constructing some test
data with some specific row time would be enough.

Jin Yi  于2021年11月30日周二 上午3:34写道:

> bump.  a more general question is what do people do for more end to end,
> full integration tests to test event time based jobs with timers?
>
> On Tue, Nov 23, 2021 at 11:26 AM Jin Yi  wrote:
>
>> i am writing an integration test where i execute a streaming flink job
>> using faked, "unbounded" input where i want to control when the source
>> function(s) complete by triggering them once the job's operator's maximum
>> output watermarks are beyond some job completion watermark that's relative
>> to the maximum input timestamp because the flink job uses event time timers
>> to produce some output.
>>
>> here is the faked, "unbounded" source function class:
>>
>>   private static class WaitingSourceFunction extends
>> FromElementsFunction {
>>
>> private boolean isWaiting;
>>
>> private TypeInformation typeInfo;
>>
>>
>> private WaitingSourceFunction(
>>
>> StreamExecutionEnvironment env, Collection data,
>> TypeInformation typeInfo)
>>
>> throws IOException {
>>
>>   super(typeInfo.createSerializer(env.getConfig()), data);
>>
>>   this.isWaiting = true;
>>
>>   this.typeInfo = typeInfo;
>>
>> }
>>
>>
>> @Override
>>
>> public void cancel() {
>>
>>   super.cancel();
>>
>>   isWaiting = false;
>>
>> }
>>
>>
>> @Override
>>
>> public void run(SourceContext ctx) throws Exception {
>>
>>   super.run(ctx);
>>
>>   while (isWaiting) {
>>
>> TimeUnit.SECONDS.sleep(10);
>>
>>   }
>>
>> }
>>
>>
>> public long getEndWatermark() {
>>
>>   // *TODO*
>>
>>   return 100;
>>
>> }
>>
>>   }
>>
>> and here is function where i want to busy wait (currently hacked up to
>> print info to show my problem):
>>
>>   private void waitForDone(String jobName, WaitingSourceFunction...
>> functions)
>>
>>   throws ConfigurationException, Exception, ExecutionException,
>> IOException, InterruptedException {
>>
>> JobExecutionResult jobResult = env.execute(jobName);
>>
>> RestClient restClient = new RestClient(
>>
>> RestClientConfiguration.fromConfiguration(getClientConfiguration()),
>> scheduledExecutorService);
>>
>> URI restUri = MiniClusterExtension.flinkCluster.getRestAddres();
>>
>>
>> System.out.printf("** JOB: %s %s\n", jobName, jobResult.getJobID());
>>
>>
>> long currentWatermark = 0;
>>
>> long lastInputWatermark = Arrays.stream(functions)
>>
>>   .map(f -> f.getEndWatermark())
>>
>>   .mapToLong(l -> l)
>>
>>   .max().getAsLong();
>>
>> for (int i = 0; i < 3 ; i++) {
>>
>> //while (currentWatermark < (lastInputWatermark + 1000)) {
>>
>>   JobDetailsHeaders getVertexHeaders =
>> JobDetailsHeaders.getInstance();
>>
>>   JobMessageParameters getVertexParams =
>> getVertexHeaders.getUnresolvedMessageParameters();
>>
>>   getVertexParams.jobPathParameter.resolve(jobResult.getJobID());
>>
>>   List vertexIds =
>>
>> restClient.sendRequest(restUri.getHost(), restUri.getPort(),
>> getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance())
>>
>>   .get().getJobVertexInfos().stream()
>>
>>   .map(v -> v.getJobVertexID())
>>
>>   .collect(Collectors.toUnmodifiableList());
>>
>>
>>   for (JobVertexID vertexId : vertexIds) {
>>
>> JobVertexMetricsHeaders getWatermarkHeader =
>> JobVertexMetricsHeaders.getInstance();
>>
>> JobVertexMetricsMessageParameters getWatermarkParams =
>> getWatermarkHeader.getUnresolvedMessageParameters();
>>
>>
>> getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID());
>>
>> getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId);
>>
>> System.out.printf("** LOG VERTEX: %s\n", vertexId);
>>
>> try {
>>
>>   long maxWatermark = restClient.sendRequest(
>>
>>   restUri.getHost(), restUri.getPort(), getWatermarkHeader,
>> getWatermarkParams, EmptyRequestBody.getInstance())
>>
>> .get().getMetrics().stream()
>>
>> .filter(m -> m.getId().endsWith("Watermark"))
>>
>> .map(m -> {
>>
>>   System.out.printf("** LOG METRIC: %s\n", m);
>>
>>   return Long.valueOf(StringUtil.isBlank(m.getValue()) ? "0"
>> : m.getValue());
>>
>> })
>>
>> .mapToLong(v -> v)
>>
>> .max().orElse(0);
>>
>>   currentWatermark = Math.max(currentWatermark, maxWatermark);
>>
>> } catch (Exception 

flink web UI job overview 中的算子记录条数不更新

2021-11-29 Thread zhiyuan su
Hello,麻烦问下,为啥算子链 默认合并之后,数据更新,算子对应的接收和发送记录数不更新,一直显示为0.

图片发送了看不到,请看附件


关于flink on yarn 跨多hdfs集群访问的问题

2021-11-29 Thread chenqizhu
all,您好:

 flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式 
配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:




flink.hadoop.dfs.nameservices: ACluster,BCluster

flink.hadoop.fs.defaultFS: hdfs://BCluster




flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2

flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000

flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070

flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070

flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider




flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2

flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070

flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070

flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider




但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:

(将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS: 
hdfs://ACluster)




Caused by: BCluster

java.net.UnknownHostException: BCluster

at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)

at 
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)

at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)

at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)

at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)

at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)

at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)

at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)

at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)




对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。




我的组件版本:

flink : 1.13.3

hadoop : 3.3.0




期待回复,感谢!

Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Caizhi Weng
Hi!

Flink 1.14 release note states about this. See [1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer

Marco Villalobos  于2021年11月30日周二 上午7:12写道:

> Hi everybody,
>
> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
> to using the KafkaSourceBuilder.
>
> FlinkKafkaConsumer has the method
>
> /**
>>  * Specifies whether or not the consumer should commit offsets back to
>> Kafka on checkpoints.
>>  * This setting will only have effect if checkpointing is enabled for the
>> job. If checkpointing isn't
>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>> "enable.auto.commit" (for 0.9+) property
>>  * settings will be used.
>> */
>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>
>
> How do I setup that parameter when using the KafkaSourceBuilder? If I
> already have checkpointing configured, is it necessary to setup "commit
> offsets on checkpoints"?
>
> The Flink 1.12 documentation does not discuss this topic, and the Flink
> 1.14 documentation says little about it.
>
>  For example, the Flink 1.14 documentation states:
>
> Additional Properties
>> In addition to properties described above, you can set arbitrary
>> properties for KafkaSource and KafkaConsumer by using
>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>> following options for configuration:
>> commit.offsets.on.checkpoint specifies whether to commit consuming
>> offsets to Kafka brokers on checkpoint
>
>
> And the 1.12 documentation states:
>
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
>> records from a topic and periodically checkpoint all its Kafka offsets,
>> together with the state of other operations. In case of a job failure,
>> Flink will restore the streaming program to the state of the latest
>> checkpoint and re-consume the records from Kafka, starting from the offsets
>> that were stored in the checkpoint.
>> The interval of drawing checkpoints therefore defines how much the
>> program may have to go back at most, in case of a failure. To use fault
>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>> in the job.
>> If checkpointing is disabled, the Kafka consumer will periodically commit
>> the offsets to Zookeeper.
>
>
> Thank you.
>
> Marco
>
>
>


关于flink on yarn 跨多hdfs集群访问的问题

2021-11-29 Thread chenqizhu
all,您好:
 flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式 
配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:


flink.hadoop.dfs.nameservices: ACluster,BCluster
flink.hadoop.fs.defaultFS: hdfs://BCluster


flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider


flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider


但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
(将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS: 
hdfs://ACluster)


Caused by: BCluster
java.net.UnknownHostException: BCluster
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
at 
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。


我的组件版本:
flink : 1.13.3
hadoop : 3.3.0











Flink1.13.3 on Native K8s ????????????

2021-11-29 Thread ????-??????
Flink1.13.3 Application on Native 
K8s??
1.??UIcancel
2.kubectl delete deployment/cluster_id bin/flink 
run-application??Checkpointck


??

How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Marco Villalobos
Hi everybody,

I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
to using the KafkaSourceBuilder.

FlinkKafkaConsumer has the method

/**
>  * Specifies whether or not the consumer should commit offsets back to
> Kafka on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for the
> job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit"
> (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)


How do I setup that parameter when using the KafkaSourceBuilder? If I
already have checkpointing configured, is it necessary to setup "commit
offsets on checkpoints"?

The Flink 1.12 documentation does not discuss this topic, and the Flink
1.14 documentation says little about it.

 For example, the Flink 1.14 documentation states:

Additional Properties
> In addition to properties described above, you can set arbitrary
> properties for KafkaSource and KafkaConsumer by using
> setProperties(Properties) and setProperty(String, String). KafkaSource has
> following options for configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming offsets
> to Kafka brokers on checkpoint


And the 1.12 documentation states:

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
> records from a topic and periodically checkpoint all its Kafka offsets,
> together with the state of other operations. In case of a job failure,
> Flink will restore the streaming program to the state of the latest
> checkpoint and re-consume the records from Kafka, starting from the offsets
> that were stored in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the program
> may have to go back at most, in case of a failure. To use fault tolerant
> Kafka Consumers, checkpointing of the topology needs to be enabled in the
> job.
> If checkpointing is disabled, the Kafka consumer will periodically commit
> the offsets to Zookeeper.


Thank you.

Marco


Re: REST API for detached minicluster based integration test

2021-11-29 Thread Jin Yi
bump.  a more general question is what do people do for more end to end,
full integration tests to test event time based jobs with timers?

On Tue, Nov 23, 2021 at 11:26 AM Jin Yi  wrote:

> i am writing an integration test where i execute a streaming flink job
> using faked, "unbounded" input where i want to control when the source
> function(s) complete by triggering them once the job's operator's maximum
> output watermarks are beyond some job completion watermark that's relative
> to the maximum input timestamp because the flink job uses event time timers
> to produce some output.
>
> here is the faked, "unbounded" source function class:
>
>   private static class WaitingSourceFunction extends
> FromElementsFunction {
>
> private boolean isWaiting;
>
> private TypeInformation typeInfo;
>
>
> private WaitingSourceFunction(
>
> StreamExecutionEnvironment env, Collection data,
> TypeInformation typeInfo)
>
> throws IOException {
>
>   super(typeInfo.createSerializer(env.getConfig()), data);
>
>   this.isWaiting = true;
>
>   this.typeInfo = typeInfo;
>
> }
>
>
> @Override
>
> public void cancel() {
>
>   super.cancel();
>
>   isWaiting = false;
>
> }
>
>
> @Override
>
> public void run(SourceContext ctx) throws Exception {
>
>   super.run(ctx);
>
>   while (isWaiting) {
>
> TimeUnit.SECONDS.sleep(10);
>
>   }
>
> }
>
>
> public long getEndWatermark() {
>
>   // *TODO*
>
>   return 100;
>
> }
>
>   }
>
> and here is function where i want to busy wait (currently hacked up to
> print info to show my problem):
>
>   private void waitForDone(String jobName, WaitingSourceFunction...
> functions)
>
>   throws ConfigurationException, Exception, ExecutionException,
> IOException, InterruptedException {
>
> JobExecutionResult jobResult = env.execute(jobName);
>
> RestClient restClient = new RestClient(
>
> RestClientConfiguration.fromConfiguration(getClientConfiguration()),
> scheduledExecutorService);
>
> URI restUri = MiniClusterExtension.flinkCluster.getRestAddres();
>
>
> System.out.printf("** JOB: %s %s\n", jobName, jobResult.getJobID());
>
>
> long currentWatermark = 0;
>
> long lastInputWatermark = Arrays.stream(functions)
>
>   .map(f -> f.getEndWatermark())
>
>   .mapToLong(l -> l)
>
>   .max().getAsLong();
>
> for (int i = 0; i < 3 ; i++) {
>
> //while (currentWatermark < (lastInputWatermark + 1000)) {
>
>   JobDetailsHeaders getVertexHeaders =
> JobDetailsHeaders.getInstance();
>
>   JobMessageParameters getVertexParams =
> getVertexHeaders.getUnresolvedMessageParameters();
>
>   getVertexParams.jobPathParameter.resolve(jobResult.getJobID());
>
>   List vertexIds =
>
> restClient.sendRequest(restUri.getHost(), restUri.getPort(),
> getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance())
>
>   .get().getJobVertexInfos().stream()
>
>   .map(v -> v.getJobVertexID())
>
>   .collect(Collectors.toUnmodifiableList());
>
>
>   for (JobVertexID vertexId : vertexIds) {
>
> JobVertexMetricsHeaders getWatermarkHeader =
> JobVertexMetricsHeaders.getInstance();
>
> JobVertexMetricsMessageParameters getWatermarkParams =
> getWatermarkHeader.getUnresolvedMessageParameters();
>
> getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID());
>
> getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId);
>
> System.out.printf("** LOG VERTEX: %s\n", vertexId);
>
> try {
>
>   long maxWatermark = restClient.sendRequest(
>
>   restUri.getHost(), restUri.getPort(), getWatermarkHeader,
> getWatermarkParams, EmptyRequestBody.getInstance())
>
> .get().getMetrics().stream()
>
> .filter(m -> m.getId().endsWith("Watermark"))
>
> .map(m -> {
>
>   System.out.printf("** LOG METRIC: %s\n", m);
>
>   return Long.valueOf(StringUtil.isBlank(m.getValue()) ? "0"
> : m.getValue());
>
> })
>
> .mapToLong(v -> v)
>
> .max().orElse(0);
>
>   currentWatermark = Math.max(currentWatermark, maxWatermark);
>
> } catch (Exception e) {
>
>   System.out.printf("** LOG ERROR: %s\n", e);
>
> }
>
>   }
>
>   System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark,
> lastInputWatermark);
>
>   TimeUnit.SECONDS.sleep(1);
>
> }
>
>
> System.out.println("** CANCEL SOURCES");
>
> for (WaitingSourceFunction function : functions) *{*
>
>   function.cancel();
>
> }
>
>   }
>
> THE PROBLEM: the output of the logging in the test clearly shows that the
> watermark metrics are all null throughout the wait loop:
> https://paste-bin.xyz/16195
>
> I also tried using JobVertexWatermarksHeaders instead of
> JobVertexMetricsHeaders, for the REST get on the job vertices to get
> watermark information, but 

Re: REST service for flinkSQL

2021-11-29 Thread Lu Niu
Sure. The requirement is to create a playground so that user can quickly
prototype flinksql queries. We have a unified UI for all sqls, including
presto and sparksql. We want to integrate flinksql into it. Deploying to
prod is not a hard requirement.

Best
Lu

On Tue, Nov 23, 2021 at 11:54 PM Martijn Visser 
wrote:

> Hi Lu,
>
> The only thing there currently is, is indeed
> https://github.com/ververica/flink-sql-gateway/. Could you elaborate on
> why you're looking for a REST service for Flink SQL and what you would
> expect from such a service?
>
> Best regards,
>
> Martijn
>
> On Wed, 24 Nov 2021 at 00:57, Lu Niu  wrote:
>
>> Hi, Flink Users
>>
>> I am wondering whether there is a REST service for submitting flinkSQL,
>> similar like Livy to SparkSQL? I found
>> https://github.com/ververica/flink-sql-gateway/ but I am not sure
>> whether it's still active.
>>
>> Best
>> Lu
>>
>


Re: How to Fan Out to 100s of Sinks

2021-11-29 Thread SHREEKANT ANKALA
Hi,
Here is our scenario:

We have a system that generates data in a jsonl file for all of customers 
together. We now need to process this jsonl data and conditionally distribute 
the data to individual customer based on their preferences as Iceberg Tables. 
So every line in the jsonl file, the data will end up one of the customers S3 
bucket as an Iceberg table row. We were hoping to continue using Flink for this 
use case by just one job doing a conditional sink, but we are not sure if that 
would be the right usage of Flink.

Thanks,
Shree

From: Fabian Paul 
Sent: Monday, November 29, 2021 1:57 AM
To: SHREEKANT ANKALA 
Cc: user@flink.apache.org 
Subject: Re: How to Fan Out to 100s of Sinks

Hi,

What do you mean by "fan out" to 100 different sinks? Do you want to
replicate the data in all buckets or is there some conditional
branching logic?

In general, Flink can easily support 100 different sinks but I am not
sure if this is the right approach for your use case. Can you clarify
your motivation and tell us a bit more about the exact scenario?

Best,
Fabian



On Mon, Nov 29, 2021 at 1:11 AM SHREEKANT ANKALA  wrote:
>
> Hi all, we current have a Flink job that retrieves jsonl data from GCS and 
> writes to Iceberg Tables. We are using Flink 13.2 and things are working fine.
>
> We now have to fan out that same data in to 100 different sinks - Iceberg 
> Tables on s3. There will be 100 buckets and the data needs to be sent to each 
> of these 100 different buckets.
>
> We are planning to add a new Job that will write to 1 sink at a time for each 
> time it is launched. Is there any other optimal approach possible in Flink to 
> support this use case of 100 different sinks?


[DISCUSS] Deprecate Java 8 support

2021-11-29 Thread Chesnay Schepler

Hello,

we recently had a discussion on the dev mailing list for deprecating 
support for Java 8 in 1.15, with a general consensus in favor of it.


I now wanted to check in with you, our users, to see what you have got 
to say about that.



   Why are we interested in deprecating Java 8 support now (and in
   eventually removing it)?

The main reason is that supporting the recently released Java 17 (and 
subsequent versions), while maintaining Java 8 support,
will be more complicated than if Java 11 were the oldest release 
version. Essentially because Java 11/17 have both crossed the Java 9 chasm.


We will still have to bite this bullet in any case (because Java 17 is 
out /now /but we are /not /dropping Java 8 /now/), but we would still
like to signal that users should upgrade to Java 11 so that we can 
/eventually/ clean this up.


Furthermore, it is currently hard to justify investing time into 
benchmarks/performance improvements that are specific to Java 11+, because

they provide no benefit to Java 8.


   What does the deprecation mean exactly?

It will primarily mean that a warning will be logged when you run Flink 
on Java 8.
We /may/ change the default Java version of the Docker images to Java 11 
(the java8 tags will remain),

and we will put a larger emphasis on Flink's performance on Java 11.


   Does that mean that Java 8 support will be removed in 1.16/1.17?

No. We are not putting a hard-date on the removal of Java 8 support at 
this time.



   Will this mean that at some point we'll surprise you with the
   removal of Java 8 support in the next release?

No. We will announce the removal ahead of time by /at least/ half a year 
/ 2+ releases (probably closer to a full year).



   Is the deprecation already decided?

No. The responses in this thread are integral for deciding whether a 
deprecation at this time makes sense.



If you are still using Java 8 at the moment, then we would appreciate if 
you could tell us whether you already have a time-frame for
when you intend to upgrade to Java 11. We'd also be interested in 
anything that blocks your migration to Java 11.



Please raise concerns you have, and feel free to ask questions.


Re: Query regarding exceptions API(/jobs/:jobid/exceptions)

2021-11-29 Thread Mahima Agarwal
Hi Matthias,

We have created a JIRA ticket for this issue. Please find the jira id below

https://issues.apache.org/jira/browse/FLINK-25096

Thanks
Mahima

On Mon, Nov 29, 2021 at 2:24 PM Matthias Pohl 
wrote:

> Thanks Mahima,
> could you create a Jira ticket and, if possible, add the Flink logs? That
> would make it easier to investigate the problem.
>
> Best,
> Matthias
>
> On Sun, Nov 28, 2021 at 7:29 AM Mahima Agarwal 
> wrote:
>
>> Thanks Matthias
>>
>> But we have observed the below 2 exceptions are coming in root-exceptions
>> but not in exceptionHistory:
>>
>> caused by: java.util.concurrent.CompletionException:
>> java.lang.RuntimeException: java.io.FileNotFoundException: Cannot find
>> checkpoint or savepoint file/directory
>> 'C:\Users\abc\Documents\checkpoints\a737088e21206281db87f6492bcba074' on
>> file system 'file'.
>>
>> Caused by: java.lang.IllegalStateException: Failed to rollback to
>> checkpoint/savepoint
>> file:/mnt/c/Users/abc/Documents/checkpoints/a737088e21206281db87f6492bcba074/chk-144.
>> Thanks and Regards
>> Mahima Agarwal
>>
>>
>> On Fri, Nov 26, 2021, 13:19 Matthias Pohl  wrote:
>>
>>> Just to add a bit of context: The first-level members all-exceptions,
>>> root-exceptions, truncated and timestamp have been around for a longer
>>> time. The exceptionHistory was added in Flink 1.13. As part of this change,
>>> the aforementioned members were deprecated (see [1]). We kept them for
>>> backwards-compatibility reasons.
>>>
>>> That said, root-exception and all-exceptions are also represented in the
>>> exceptionHistory.
>>>
>>> Matthias
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-exceptions
>>>
>>> On Thu, Nov 25, 2021 at 12:14 PM Chesnay Schepler 
>>> wrote:
>>>
 root-exception: The last exception that caused a job to fail.
 all-exceptions: All exceptions that occurred the last time a job
 failed. This is primarily useful for completed jobs.
 exception-history: Exceptions that previously caused a job to fail.

 On 25/11/2021 11:52, Mahima Agarwal wrote:

 Hi Team,

 Please find the query below regarding exceptions
 API(/jobs/:jobid/exceptions)


 In response of above rest api:


 Users are getting 3 types of exceptions:
 1. exceptionHistory
 2. all-exceptions
 3. root-exception


 What is the purpose of the above 3 exceptions?


 Any leads would be appreciated.

 Thanks
 Mahima




Flink web ui job overwrite 不显示算子接收数据条数

2021-11-29 Thread drewfranklin
Hello,想请教下为什么作业算子链 是 true 的时候,算子接收的记录数都不显示尼。


| |
稚远
|
|
drewfrank...@163.com
行远自逊,笃行不怠
|
签名由网易邮箱大师定制



Flink web ui job overwrite 不显示算子接收数据条数

2021-11-29 Thread drewfranklin


Hello,想请教下为什么作业算子链 是 true 的时候,算子接收的记录数都不显示尼。


| |
稚远
|
|
drewfrank...@163.com
行远自逊,笃行不怠
|
签名由网易邮箱大师定制



Flink1.13.3 on Native K8s ????????

2021-11-29 Thread ????-??????
Flink1.13.3 Application on Native 
K8s??zkHA??
1.??UIcancel
2.kubectl delete deployment/cluster_id bin/flink 
run-application??Checkpointck


??

Flink1.13.3 on Native K8s ????????????

2021-11-29 Thread ????-??????
Flink1.13.3 Application on Native 
K8s??zkHA??
1.??UIcancel
2.kubectl delete deployment/cluster_id bin/flink 
run-application??Checkpointck


??

Flink1.13.3 on Native K8s ????????????

2021-11-29 Thread ????-??????
Flink1.13.3 Application on Native 
K8s??zkHA??
1.??UIcancel
2.kubectl delete deployment/cluster_id bin/flink 
run-application??Checkpointck


??

November 29 Flink training (today)

2021-11-29 Thread ivan.ros...@agilent.com
Hello,

Sorry to spam everyone, but am hoping there's a way to get into Ververica's 
upcoming Flink developer training, starting today November 29, 2021.  Got 
last-minute approval at work to register, but registration on website is closed 
:/

Thanks in advance,

Ivan Rosero
Agilent


Re:How to run SQL Client by Per-Job

2021-11-29 Thread JasonLee
Hi


设置一下 execution.target: yarn-per-job 就行了 


Best
JasonLee


On 11/29/2021 18:48, wrote:

 

Hi

   Can SQL Client run a job by Per-Job ?

   在网上看到曾庆东同学分享过《Flink SQL CDC 上线!我们总结了 13 
条生产实践经验》,提到使用per-job方式提交,如下图,但我在官网上看到只支持Session方式,求解正确的使用方式

Thx

wangz...@163.com

How to run SQL Client by Per-Job

2021-11-29 Thread wangzy24
 

Hi

   Can SQL Client run a job by Per-Job ? 

   在网上看到曾庆东同学分享过《Flink SQL CDC 上线!我们总结了 13 条生产
实践经验》,提到使用per-job方式提交,如下图,但我在官网上看到只支持Session方
式,求解正确的使用方式



Thx

wangz...@163.com



Re: Re: Checkpoints aborted - Job is not in state RUNNING but FINISHED

2021-11-29 Thread Yun Gao
Hi Jonas,

For the previos versions, the checkpoint would be aborted as long
as any task get finished, no matter if they are from the save vertex.

And for the Kinesis, sorry I do not find an environment to do a test,
but if the task would indeed finished if there are no shards, I think
it would indeed be a problem for this case.

Best,
Yun



 --Original Mail --
Sender:jonas eyob 
Send Date:Fri Nov 26 22:40:48 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Checkpoints aborted - Job is not in state RUNNING but FINISHED

Hi Yun, thanks for the quick reply!

Great to hear that a fix has been put in place as of Flink 1.14. 

Since we are currently using Beam on top of Flink, we are currently limited to 
the Flink 1.13 runner, so I would expect the Fix not to be available to us yet. 

But to clarify the underlying problem for me: is this caused by having tasks 
parallelism > 1, but only of them is RUNNING (other in FINISHED state)?
Would there be a problem if say, we have two tasks to consume events from a 
kinesis source but the stream has only 1 shard? 
Den fre 26 nov. 2021 kl 03:14 skrev Yun Gao :

Hi Jonas,

Previously Flink indeed does not support checkpoints after some tasks finished. 
In 1.14 we implement a first version for this feature (namely 
https://issues.apache.org/jira/browse/FLINK-2491),
and it could be enabled by set 
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
We will also try to enable the flag by default in 1.15.

Best,
Yun



--
Sender:jonas eyob
Date:2021/11/26 01:53:17
Recipient:user
Theme:Checkpoints aborted - Job is not in state RUNNING but FINISHED

Hi all,

I have been struggling with this issue for a couple of days now. Checkpointing 
appears to fail as the Task Source ( kinesis stream in this case) appears to be 
in a FINISHED state. 

Excerpt from Jobmanager logs:

2021-11-25 12:52:00,479 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
Source Events/Read(KinesisSource) -> Flat Map -> Source 
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse 
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> 
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (1/2) 
(eb31cbc4e319588ba79a26d26abcd2f3) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,494 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
Source Events/Read(KinesisSource) -> Flat Map -> Source 
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse 
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> 
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) 
(1eae72b5680529fbd3b4becadb803910) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,569 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - GroupByKey -> 
ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState) -> Reduce 
state/ParMultiDo(ReduceState) -> Store state/ParMultiDo(StoreState) (1/2) 
(1a77c7ed026ac4e4a59ab66876053102) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,582 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - GroupByKey -> 
ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState) -> Reduce 
state/ParMultiDo(ReduceState) -> Store state/ParMultiDo(StoreState) (2/2) 
(31588d4dad22821d7226ec65687d0edb) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,881 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
Source Events/Read(KinesisSource) -> Flat Map -> Source 
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse 
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> 
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) 
(1eae72b5680529fbd3b4becadb803910) switched from RUNNING to FINISHED.
2021-11-25 12:52:06,528 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Checkpoint 
triggering task Source: Source Events/Read(KinesisSource) -> Flat Map -> Source 
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse 
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) -> 
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) of job 
 is not in state RUNNING but FINISHED instead. 
Aborting checkpoint.

For context, here is an excerpt from the flink-conf.yaml file:

flink-conf.yaml: |+
 # TaskManager configurations
 taskmanager.numberOfTaskSlots: 2
 taskmanager.rpc.port: 6122
 taskmanager.memory.process.size: 1728m

 # JobManager configurations
 jobmanager.rpc.address: {{ $fullName }}-jobmanager
 jobmanager.rpc.port: 6123
 jobmanager.memory.process.size: 1600m
 blob.server.port: 6124
 queryable-state.proxy.ports: 6125
 parallelism.default: 1 # default paralleism when not defined elsewhere
 kubernetes.namespace: {{ $fullName }} # The namespace that will be used for 
running the jobmanager and taskmanager pods.
 scheduler-mode: reactive
 # High-availability 

Re:回复: flink远程调用时环境变量问题

2021-11-29 Thread RS
试试 flink-conf.yam里面配置 env.hadoop.conf.dir: /xxx/hadoop

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/





在 2021-11-26 11:14:17,"王健" <13166339...@163.com> 写道:
>
>
>您好,ssh远程调用,/etc/profile配置是不起作用的呢
>
>
>| |
>王健
>|
>|
>13166339...@163.com
>|
>签名由网易邮箱大师定制
>在2021年11月26日 11:12,Mabin 写道:
>在/etc/profile里面配的
>
>发自我的iPhone
>
>在 2021年11月26日,上午11:07,王健 <13166339...@163.com> 写道:
>
>
>
>大佬们:
>远程调用flink启动任务,如何解决hadoop的环境变量问题呢,像java,hbase其他的环境变量都可以通过在flink-conf.yaml配置文件里配置,但是hadoop配置env.hadoop.conf.dir不起作用。
>可能是需要增加export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop 
>classpath`,但是这个是不能在flink-conf.yaml配置吧
>
>
>急求解决,万分感谢
>
>
>
>| |
>王健
>|
>|
>13166339...@163.com
>|
>签名由网易邮箱大师定制


Re: How to Fan Out to 100s of Sinks

2021-11-29 Thread Fabian Paul
Hi,

What do you mean by "fan out" to 100 different sinks? Do you want to
replicate the data in all buckets or is there some conditional
branching logic?

In general, Flink can easily support 100 different sinks but I am not
sure if this is the right approach for your use case. Can you clarify
your motivation and tell us a bit more about the exact scenario?

Best,
Fabian



On Mon, Nov 29, 2021 at 1:11 AM SHREEKANT ANKALA  wrote:
>
> Hi all, we current have a Flink job that retrieves jsonl data from GCS and 
> writes to Iceberg Tables. We are using Flink 13.2 and things are working fine.
>
> We now have to fan out that same data in to 100 different sinks - Iceberg 
> Tables on s3. There will be 100 buckets and the data needs to be sent to each 
> of these 100 different buckets.
>
> We are planning to add a new Job that will write to 1 sink at a time for each 
> time it is launched. Is there any other optimal approach possible in Flink to 
> support this use case of 100 different sinks?


Re: Query regarding exceptions API(/jobs/:jobid/exceptions)

2021-11-29 Thread Matthias Pohl
Thanks Mahima,
could you create a Jira ticket and, if possible, add the Flink logs? That
would make it easier to investigate the problem.

Best,
Matthias

On Sun, Nov 28, 2021 at 7:29 AM Mahima Agarwal 
wrote:

> Thanks Matthias
>
> But we have observed the below 2 exceptions are coming in root-exceptions
> but not in exceptionHistory:
>
> caused by: java.util.concurrent.CompletionException:
> java.lang.RuntimeException: java.io.FileNotFoundException: Cannot find
> checkpoint or savepoint file/directory
> 'C:\Users\abc\Documents\checkpoints\a737088e21206281db87f6492bcba074' on
> file system 'file'.
>
> Caused by: java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> file:/mnt/c/Users/abc/Documents/checkpoints/a737088e21206281db87f6492bcba074/chk-144.
> Thanks and Regards
> Mahima Agarwal
>
>
> On Fri, Nov 26, 2021, 13:19 Matthias Pohl  wrote:
>
>> Just to add a bit of context: The first-level members all-exceptions,
>> root-exceptions, truncated and timestamp have been around for a longer
>> time. The exceptionHistory was added in Flink 1.13. As part of this change,
>> the aforementioned members were deprecated (see [1]). We kept them for
>> backwards-compatibility reasons.
>>
>> That said, root-exception and all-exceptions are also represented in the
>> exceptionHistory.
>>
>> Matthias
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-exceptions
>>
>> On Thu, Nov 25, 2021 at 12:14 PM Chesnay Schepler 
>> wrote:
>>
>>> root-exception: The last exception that caused a job to fail.
>>> all-exceptions: All exceptions that occurred the last time a job failed.
>>> This is primarily useful for completed jobs.
>>> exception-history: Exceptions that previously caused a job to fail.
>>>
>>> On 25/11/2021 11:52, Mahima Agarwal wrote:
>>>
>>> Hi Team,
>>>
>>> Please find the query below regarding exceptions
>>> API(/jobs/:jobid/exceptions)
>>>
>>>
>>> In response of above rest api:
>>>
>>>
>>> Users are getting 3 types of exceptions:
>>> 1. exceptionHistory
>>> 2. all-exceptions
>>> 3. root-exception
>>>
>>>
>>> What is the purpose of the above 3 exceptions?
>>>
>>>
>>> Any leads would be appreciated.
>>>
>>> Thanks
>>> Mahima
>>>
>>>