Re: Flink1.14 需求超大内存

2023-06-20 文章
我排查了一下,因为任务其实是跑在本地模式上,而我一直没有配置本地模式的slot数量导致slot不足,而这个1024G其实是一个默认值所以出现了需求1T内存这种奇怪的报错。
以往没有出现这种问题是因为以前本地模式会自动分配足够的slot,但flink doris 
connecter由于未知的原因没有被计入slot需求中,这就导致缺少一个slot无法达到需求。

> 2023年6月19日 16:18,郭欣瑞  写道:
> 
> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
> 
> DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum required 
> resources, failing slot requests. Acquired: 
> [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
>  (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), 
> managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 
> bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 
> 1, registered slots: 1 free slots: 0
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not acquire the minimum required resources.
> 
> 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> 这是我doris sink的代码,flink doris connector版本是1.1.1
> DorisSink.Builder builder = DorisSink.builder();
>DorisOptions.Builder dorisBuilder = DorisOptions.builder();
>dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>
> .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
>.setUsername(parameterTool.get("doris.user"))
>.setPassword(parameterTool.get("doris.password"));
> 
>Properties pro = new Properties();
>pro.setProperty("format", "json");
>pro.setProperty("read_json_by_line", "true");
> 
>Date date = new Date();
>DorisExecutionOptions.Builder executionBuilder = 
> DorisExecutionOptions.builder();
>
> executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
> 
>String[] fields = {"uid","subject","trade_date","update_time","value"};
>DataType[] types = 
> {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
> 
>builder.setDorisReadOptions(DorisReadOptions.builder().build())
>.setDorisExecutionOptions(executionBuilder.build())
>
> .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
>.setDorisOptions(dorisBuilder.build());
>fundCategoryDataStream.sinkTo(builder.build())
>
> .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", 
> "fund_category_sink"))
>
> .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>
> .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
>.name("fundCategorySinkName”);
> 



Flink1.14 需求超大内存

2023-06-19 文章
我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错

DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum required 
resources, failing slot requests. Acquired: 
[ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb 
(1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), 
managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 
bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 
1, registered slots: 1 free slots: 0
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not acquire the minimum required resources.

我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
这是我doris sink的代码,flink doris connector版本是1.1.1
DorisSink.Builder builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))

.setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
.setUsername(parameterTool.get("doris.user"))
.setPassword(parameterTool.get("doris.password"));

Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");

Date date = new Date();
DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();

executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);

String[] fields = {"uid","subject","trade_date","update_time","value"};
DataType[] types = 
{DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())

.setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
.setDorisOptions(dorisBuilder.build());
fundCategoryDataStream.sinkTo(builder.build())

.slotSharingGroup(parameterTool.get("fund_category_data_sink_group", 
"fund_category_sink"))

.setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))

.uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
.name("fundCategorySinkName”);