0xNacho opened a new issue, #5958:
URL: https://github.com/apache/iceberg/issues/5958
### Query engine
Flink
### Question
Hello!
I have a Flink application that reads arbitrary AVRO data, map it to RowData
and uses several FlinkSink instances to write data into ICEBERG tables. By
arbitrary data I mean that I have 100 types of AVRO message, all of them with a
common property "tableName" but containing different columns. I would like to
write each of these type of message into a separated Iceberg table.
For doing this I'm using side outputs: when I have my data mapped to
RowData, and I use a ProcessFunction that writes each message into a specific
OutputTag.
Later on, with the datastream already processed, I loop into the different
output tags, get the rcords using getSideOutput and create an specific
IcebergSink for each of them. Something like:
```java
final List<OutputTag<RowData>> tags = ... // list of all possible
output tags
final DataStream<RowData> rowdata = stream
.map(new ToRowDataMap()) // Map Custom Avro Pojo into RowData
.uid("map-row-data")
.name("Map to RowData")
.process(new ProcessRecordFunction(tags)) // process
elements one by one sending them to a specific OutputTag
.uid("id-process-record")
.name("Process Input records");;
CatalogLoader catalogLoader = ...
String upsertField = ...
outputTags
.stream()
.forEach(tag -> {
SingleOutputStreamOperator<RowData> outputStream = stream
.getSideOutput(tag)
.uid("map-row-data-" + tag)
.name("Map Row Data " + tag);
TableIdentifier identifier =
TableIdentifier.of("myDBName", tag.getId());
FlinkSink.Builder builder = FlinkSink
.forRowData(outputStream)
.table(catalog.loadTable(identifier))
.tableLoader(TableLoader.fromCatalog(catalogLoader, identifier))
.set("upsert-enabled", "true")
.uidPrefix("commiter-sink-" + tableName)
.equalityFieldColumns(Collections.singletonList(upsertField));
builder.append();
});
```
It works very well when I'm dealing with a few tables. But when the number
of tables scales up, Flink cannot adquire enough task resources to keep the
application alive and genarates a Flink graph that actually the Flink UI cannot
show, by needing too many resources....
Is there any other more efficient way of doing this? or maybe any way of
optimizing it?
Thanks in advance ! :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]