Re: Default Flink S3 FileSource timeout due to large file listing

2023-09-25 文章
退订

 Replied Message 
| From | Eleanore Jin |
| Date | 09/26/2023 01:50 |
| To | user-zh  |
| Subject | Default Flink S3 FileSource timeout due to large file listing |
Hello Flink Community,
Flink Version: 1.16.1, Zookeeper for HA.
My Flink Applications reads raw parquet files hosted in S3, applies
transformations and re-writes them to S3, under a different location.
Below is my code to read from parquets from S3:
```
   final Configuration configuration = new Configuration();
   configuration.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
   final ParquetColumnarRowInputFormat format =
 new ParquetColumnarRowInputFormat<>(
   configuration,
   ,
   InternalTypeInfo.of(),
   100,
   true,
   true
 );
   final FileSource source = FileSource
 .forBulkFileFormat(format, new Path("s3/"))
 .build();
stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"parquet-source");
```
I noticed the following:
1. My S3 directory, "s3//", can have more than 1M+ files. The
parquets in this directory are partitioned by date and time. This makes the
folder structure of this directory deterministic. e.g
"s3//partiton_column_a/partition_columb_b/2023-09-25--13/{1,2...N}.parquet".
I believe the Flink Default FileSource is doing a list on this large
directory and gets stuck waiting for the operation to complete. The Akka
connect timeout error messages in the Task Manager logs support this.
Additionally, the job runs successfully when I restrict the input to a
subfolder, looking at only an hour's data, based on the mentioned
partitioning scheme. In my local machine, I also tried using S3 CLI to
recursively list this directory and the operation did not complete in 1
hour.

*Is this behavior expected based on Flink's S3 source implementation? *Looking
at the docs
,
one way to solve this is to implement the Split Enumerator by incrementally
processing the subfolders in "s3//", based on the mentioned
partitioning scheme.

*Are there any other approaches available?*
2. Following the code above, when I deserialize records from S3 I get
records of type BinaryRowData
.
However, when I use the same code in Unit Testing, with
MiniClusterWithClientResource
,
to read from a local parquet file (not S3), I get records of type
GenericRowData

.

*What is the reason for this discrepancy and is it possible to force
deserialization to output type GenericRowData? *Currently, I have written
code to convert BinaryRowData to GenericRowData as our downstream
ProcessFunctions expect this type.
I*s there a better solution to transform BinaryRowData to GenericRowData?*

Thanks!
Eleanore


Re:Re: Flink1.14 需求超大内存

2023-06-20 文章
退订










在 2023-06-20 11:16:18,"Yanfei Lei"  写道:
>Hi,
>
>从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes),
>taskOffHeapMemory=1024.000gb (1099511627776 bytes),
>managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb
>(67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap
>memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory
>size相关的参数[1].
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size
>
>Best,
>Yanfei
>
>Shammon FY  于2023年6月20日周二 08:45写道:
>>
>> Hi,
>>
>> 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞  wrote:
>>
>> > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>> >
>> > DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
>> > required resources, failing slot requests. Acquired:
>> > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
>> > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
>> > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
>> > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
>> > TMs: 1, registered slots: 1 free slots: 0
>> > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> > Could not acquire the minimum required resources.
>> >
>> > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
>> > 这是我doris sink的代码,flink doris connector版本是1.1.1
>> > DorisSink.Builder builder = DorisSink.builder();
>> > DorisOptions.Builder dorisBuilder = DorisOptions.builder();
>> > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>> >
>> > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
>> > .setUsername(parameterTool.get("doris.user"))
>> > .setPassword(parameterTool.get("doris.password"));
>> >
>> > Properties pro = new Properties();
>> > pro.setProperty("format", "json");
>> > pro.setProperty("read_json_by_line", "true");
>> >
>> > Date date = new Date();
>> > DorisExecutionOptions.Builder executionBuilder =
>> > DorisExecutionOptions.builder();
>> >
>> > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
>> >
>> > String[] fields =
>> > {"uid","subject","trade_date","update_time","value"};
>> > DataType[] types =
>> > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
>> >
>> > builder.setDorisReadOptions(DorisReadOptions.builder().build())
>> > .setDorisExecutionOptions(executionBuilder.build())
>> >
>> > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
>> > .setDorisOptions(dorisBuilder.build());
>> > fundCategoryDataStream.sinkTo(builder.build())
>> >
>> > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
>> > "fund_category_sink"))
>> >
>> > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>> >
>> > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
>> > .name("fundCategorySinkName”);
>> >
>> >
>> >


回复:python 自定义sink

2023-06-05 文章
退订
 回复的原邮件 
| 发件人 | smq<374060...@qq.com.invalid> |
| 发送日期 | 2023年05月30日 12:22 |
| 收件人 | user-zh  |
| 主题 | python 自定义sink |
java中可以继承richainkfunction和checkpointedfunction 两个类实现自定义sink。在python中如何实现这种功能呢

Re:Re: Re: sink mysql id自增表数据会丢失

2023-04-18 文章
退订











在 2023-04-19 09:15:09,"Shammon FY"  写道:
>如果想让mysql生成自增主键,可以在flink ddl的table里不增加主键字段,然后flink作业直接写入数据到table就可以了
>
>On Tue, Apr 18, 2023 at 5:38 PM Jeff  wrote:
>
>> 在sink时指定字段不可以不包括自增主键的列。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-04-17 07:29:16,"Shammon FY"  写道:
>> >Hi
>> >
>> >如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?
>> >
>> >On Sun, Apr 16, 2023 at 7:58 PM Jeff  wrote:
>> >
>> >> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
>> >>
>> >>
>> >>  mysql内表ddl:
>> >>
>> >> create table test (id bigint primary key auto_increment , passport
>> >> varchar);
>> >>
>> >>
>> >> flink sql:
>> >> insert into mysql_catalog.test select 0, passport from source_table;
>> >>
>> >> 之所以select 0是表示使用物理表的自增值。
>>