Hi, 从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory size相关的参数[1].
[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size Best, Yanfei Shammon FY <zjur...@gmail.com> 于2023年6月20日周二 08:45写道: > > Hi, > > 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置 > > Best, > Shammon FY > > On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 <guoxin...@betalpha.com.invalid> wrote: > > > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错 > > > > DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum > > required resources, failing slot requests. Acquired: > > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb > > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), > > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 > > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered > > TMs: 1, registered slots: 1 free slots: 0 > > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > > Could not acquire the minimum required resources. > > > > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 > > 这是我doris sink的代码,flink doris connector版本是1.1.1 > > DorisSink.Builder<RowData> builder = DorisSink.builder(); > > DorisOptions.Builder dorisBuilder = DorisOptions.builder(); > > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) > > > > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) > > .setUsername(parameterTool.get("doris.user")) > > .setPassword(parameterTool.get("doris.password")); > > > > Properties pro = new Properties(); > > pro.setProperty("format", "json"); > > pro.setProperty("read_json_by_line", "true"); > > > > Date date = new Date(); > > DorisExecutionOptions.Builder executionBuilder = > > DorisExecutionOptions.builder(); > > > > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); > > > > String[] fields = > > {"uid","subject","trade_date","update_time","value"}; > > DataType[] types = > > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; > > > > builder.setDorisReadOptions(DorisReadOptions.builder().build()) > > .setDorisExecutionOptions(executionBuilder.build()) > > > > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) > > .setDorisOptions(dorisBuilder.build()); > > fundCategoryDataStream.sinkTo(builder.build()) > > > > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", > > "fund_category_sink")) > > > > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) > > > > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) > > .name("fundCategorySinkName”); > > > > > >