Re: Question around manually setting Flink jobId

2024-03-13 Thread Junrui Lee
Hi Allison,

The PIPELINE_FIXED_JOB_ID configuration option is not intended for public
use. IIUC, the only way to manually specify the jobId is submitting a job
through the JAR RUN REST API, where you can provide the jobId in the
request body (
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
).

Best,
Junrui

Allison Chang via user  于2024年3月14日周四 08:16写道:

> Hi,
>
> I was wondering if there is any way to manually set the jobID for the
> jobGraph. I noticed that there is a configuration for
> PIPELINE_FIXED_JOB_ID, but there doesn't seem to be a way to set it via
> config with the StreamingJobGraphGenerator.java. Would appreciate any
> assistance if anyone has done something similar.
>
> Best,
>
> *Allison Chang*
>
>
>


Question around manually setting Flink jobId

2024-03-13 Thread Allison Chang via user
Hi,

I was wondering if there is any way to manually set the jobID for the jobGraph. 
I noticed that there is a configuration for PIPELINE_FIXED_JOB_ID, but there 
doesn't seem to be a way to set it via config with the 
StreamingJobGraphGenerator.java. Would appreciate any assistance if anyone has 
done something similar.

Best,

Allison Chang




FlinkSource to read iceberg table in Batch mode

2024-03-13 Thread Chetas Joshi
Hello,

I am using iceberg-flink-runtime lib (1.17-1.4.0) and running the following
code to read an iceberg table in BATCH mode.

var source = FlinkSource
.forRowData()
.streaming(false)
.env(execEnv)
.tableLoader(tableLoader)
.limit((long) operation.getLimit())
.filters(buildFilterExpression(operation))
.build();

var stream = source.map(rowDataMapper).name(operation.getName());

I am running into the following exception even though streaming = false.

Detected an UNBOUNDED source with the 'execution.runtime-mode' set to
'BATCH'. This combination is not allowed, please set the
'execution.runtime-mode' to STREAMING or AUTOMATIC

Would appreciate any pointers here.

Thank you
Chetas


退订

2024-03-13 Thread 李一飞
退订




Re: 如何查询create table语句的详细内容

2024-03-13 Thread Yubin Li
刚刚图没发完整
[image: Screenshot 2024-03-13 103802.png]

Yubin Li  于2024年3月13日周三 17:44写道:

> 用show create table语句
> [image: Screenshot 2024-03-13 103802.png]
>
> ha.fen...@aisino.com  于2024年3月12日周二 15:37写道:
>
>> 例如
>> CREATE TABLE Orders_in_kafka (
>> -- 添加 watermark 定义
>> WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
>> ) WITH (
>> 'connector' = 'kafka',
>> ...
>> )
>> LIKE Orders_in_file (
>> EXCLUDING ALL
>> INCLUDING GENERATED
>> );
>>
>> 通过like生成的表,如何查看Orders_in_kafka 这个表完整的create table定义。
>>
>


Re: 如何查询create table语句的详细内容

2024-03-13 Thread Yubin Li
用show create table语句
[image: Screenshot 2024-03-13 103802.png]

ha.fen...@aisino.com  于2024年3月12日周二 15:37写道:

> 例如
> CREATE TABLE Orders_in_kafka (
> -- 添加 watermark 定义
> WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'kafka',
> ...
> )
> LIKE Orders_in_file (
> EXCLUDING ALL
> INCLUDING GENERATED
> );
>
> 通过like生成的表,如何查看Orders_in_kafka 这个表完整的create table定义。
>


Re: flink写kafka时,并行度和分区数的设置问题

2024-03-13 Thread Zhanghao Chen
你好,

写 Kafka 分区的策略取决于使用的 Kafka Sink 的 Partitioner [1],默认使用的是 Kafka 的 Default 
Partitioner,底层使用了一种称之为黏性分区的策略:对于指定 key 的数据按照对 key hash 的方式选择分区写入,对于未指定 key 
的数据则随机选择一个分区,然后“黏住”这个分区一段时间以提升攒批效果,然后攒批结束写完后再随机换一个分区,来在攒批效果和均匀写入间做一个平衡。
具体可以参考 [2]。

因此,默认配置下不存在你说的遍历导致攒批效果下降的问题,在达到 Kafka 
单分区写入瓶颈前,只是扩大写入并发就会有比较好的提升写入吞吐的效果。不过在一些特殊情况下,比如如果你并发很高,单并发写入 QPS 
极低,以致于单次攒批周期内甚至只有一两条消息,导致攒批效果差,打到 Kafka 写入瓶颈,那么降低并发可能反而能通过提升攒批效果的形式,配合写入压缩降低写入 
Kafka 流量,提升写入吞吐。

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#sink-partitioning
[2] https://www.cnblogs.com/huxi2b/p/12540092.html



From: chenyu_opensource 
Sent: Wednesday, March 13, 2024 15:25
To: user-zh@flink.apache.org 
Subject: flink写kafka时,并行度和分区数的设置问题

您好:
 flink将数据写入kafka【kafka为sink】,当kafka 
topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。
 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升?

 是否有相关的源码可以查看。
期待回复,祝好,谢谢!





Re: Flink Batch Execution Mode

2024-03-13 Thread irakli.keshel...@sony.com
Hi Feng,

I'm using flink-connector-kafka 3.0.1-1.17. I see that 1.17 is affected, but 
the ticket is marked as fixed so I'm not sure if that is actually the issue.

Best,
Irakli

From: Feng Jin 
Sent: 12 March 2024 18:28
To: Keshelava, Irakli 
Cc: user@flink.apache.org 
Subject: Re: Flink Batch Execution Mode

Hi Irakli

What version of flink-connector-kafka are you using?
You may have encountered a bug [1] in the old version that prevents the source 
task from entering the finished state.


[1]. 
https://issues.apache.org/jira/browse/FLINK-31319

Best,
Feng


On Tue, Mar 12, 2024 at 7:21 PM 
irakli.keshel...@sony.com 
mailto:irakli.keshel...@sony.com>> wrote:
Hello,

I have a Flink job that is running in the Batch mode. The source for the job is 
a Kafka topic which has limited number of events. I can see that the job starts 
running fine and consumes the events, but never makes it past the first task 
and becomes idle. The Kafka source is defined to be bounded by following 
command: "KafkaSource.builder().setBounded(OffsetsInitializer.latest())".
I expect the job to consume all the events that are in the Kafka topic and then 
move to the next task, but I'm not sure if the "OffsetsInitializer.latest()" is 
the right OffsetInitializer. Can anyone help me out here? Thanks!

Cheers,
Irakli


Re:Flink 1.18 with Java 17 production version release

2024-03-13 Thread Xuyang
Hi, Meng.
I think you can follow this jira[1] and ping the creator about the latest 
progress.


[1] https://issues.apache.org/jira/browse/FLINK-34491



--

Best!
Xuyang




At 2024-03-13 04:02:09, "Meng, Ping via user"  wrote:

Hi,

The latest Flink 1.18.1 with Java 17 support is in beta mode, users can report 
issue, is there a planned release date for production version? Do you have a 
roadmap for production version?

 

Thank you,

Angela Meng

 

Re: flink集群如何将日志直接写入elasticsearch中?

2024-03-13 Thread Jiabao Sun
比较简单的方式是启动一个filebeat进程,抓取 jobmanager.log 和t askmanager.log

Best,
Jiabao

kellygeorg...@163.com  于2024年3月13日周三 15:30写道:

> 有没有比较方便快捷的解决方案?
>
>
>


Re:一次执行单条insert语句和一次执行多条insert语句有什么区别

2024-03-13 Thread Xuyang
Hi, fengqi.
上面那种statement的方式,最终将只会产生一个作业,这个作业有机会复用这个source(拓扑图1 个source  -> 2 
个calc_sink),因此只需要读一次source就行了。
下面那种execute sql两次的方式,将产生两个作业,两个作业完全独立。



--

Best!
Xuyang





At 2024-03-13 12:26:05, "ha.fen...@aisino.com"  wrote:
>StatementSet stmtSet = tEnv.createStatementSet();
>stmtSet.addInsertSql(
>  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product 
> LIKE '%Rubber%'");
>stmtSet.addInsertSql(
>  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product 
> LIKE '%Glass%'");
>TableResult tableResult2 = stmtSet.execute();
>与下面有什么区别?
>tEnv.executeSql(
>  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product 
> LIKE '%Rubber%'");
>tEnv.executeSql(
>  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product 
> LIKE '%Glass%'");


flink集群如何将日志直接写入elasticsearch中?

2024-03-13 Thread kellygeorg...@163.com
有没有比较方便快捷的解决方案?




flink写kafka时,并行度和分区数的设置问题

2024-03-13 Thread chenyu_opensource
您好:
 flink将数据写入kafka【kafka为sink】,当kafka 
topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。
 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升?

 是否有相关的源码可以查看。
期待回复,祝好,谢谢!