I think this is related to Iceberg Flink connector implementation and connector 
memory configurations,
I cc @Jingsong who is more familiar with this part.

Best,
Leonard

> 在 2021年8月2日,12:20,Ayush Chauhan <[email protected]> 写道:
> 
> Hi Leonard,
> 
> I am using flink 1.11.2 and using debezium-json to read CDC data generated by 
> debezium.
> 
> For each table, I convert the Kafka dynamic table to a retract stream and 
> finally that stream is converted to DataStream<RowData>. Here's the sample 
> function
> private DataStream<RowData> getDataStream(String sql) {
>     LOGGER.debug(sql);
>     Table out = tEnv.sqlQuery(sql);
>     DataStream<Tuple2<Boolean, Row>> dsRow = tEnv.toRetractStream(out, 
> Row.class);
>     return dsRow.map((MapFunction<Tuple2<Boolean, Row>, RowData>) t2 -> {
>         RowKind rowKind = t2.f1.getKind();
>         GenericRowData genericRowData = new GenericRowData(rowKind, 
> t2.f1.getArity());
>         for (int pos = 0; pos < t2.f1.getArity(); pos = pos + 1) {
>             Object object = t2.f1.getField(pos);
>             Object convertedType;
>             if (object instanceof String) {
>                 convertedType = 
> RowDataUtil.convertConstant(Types.StringType.get(), object);
>             } else if (object instanceof LocalDateTime) {
>                 convertedType = 
> TimestampData.fromLocalDateTime((LocalDateTime) object);
>             } else {
>                 convertedType = object;
>             }
>             genericRowData.setField(pos, convertedType);
>         }
>         return genericRowData;
>     });
> }
> 
> I then pass this datastream to the Flink sink.
> FlinkSink.forRowData(rowDataDataStream)
>         .table(icebergTable)
>         
> .tableSchema(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTable.schema())))
>         .tableLoader(tableLoader)
>         .equalityFieldColumns(tableConfig.getEqualityColumns())
>         .build();
> 
> Please let me know if you need some other information too
> 
> 
> On Mon, Aug 2, 2021 at 7:48 AM Leonard Xu <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi, Ayush
> 
> Thanks for the detailed description.
> 
> Before analyze the issue, I have two questions that which Flink and Flink CDC 
> version are you using?  Is Flink CDC used in SQL or DataStream ? 
> That’s helpful if you can post you Flink CDC connector parameters.
> 
> Best,
> Leonard
> 
>> 在 2021年7月29日,18:57,Ayush Chauhan <[email protected] 
>> <mailto:[email protected]>> 写道:
>> 
>> Hi all,
>> 
>> We are using Flink + iceberg to consume CDC data. We have combined all the 
>> tables of a single DB in one job. Our job is frequently running into GC 
>> issues. Earlier it was running default on parallel GC and I have changed it 
>> to G1GC. G1GC did bring some improvements but still, I am facing the same 
>> problem. 
>> 
>> Following are the params on my job - -ytm 5120m -yjm 1024m -yD 
>> env.java.opts="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35"
>> 
>> This job is running CDC ingestion for 17 tables with a parallelism of 1 and 
>> throughput is around ~10k messages for the 10minutes checkpointing interval
>> 
>> I am attaching a part of the thread dump in this email.
>> 
>> During old GC, the job gets stuck and its checkpointing which is normally 
>> under 1 sec gets increased exponentially to the timeout threshold. Job 
>> either get failed due to checkpointing timeout or it failed to get the 
>> heartbeat of the task manager
>> 
>> <Screenshot 2021-07-29 at 16.09.19.png>
>> <Screenshot 2021-07-29 at 16.08.58.png>
>> 
>> 
>> -- 
>>  Ayush Chauhan
>> 
>> 
>> 
>> This email is intended only for the person or the entity to whom it is 
>> addressed. If you are not the intended recipient, please delete this email 
>> and contact the sender.
>> <thread_dump.txt>
> 
> 
> 
> -- 
>  Ayush Chauhan
>  Data Platform 
>    +91 9990747111 
> 
> 
> 
> This email is intended only for the person or the entity to whom it is 
> addressed. If you are not the intended recipient, please delete this email 
> and contact the sender.

Reply via email to