0xNacho opened a new issue, #5958:
URL: https://github.com/apache/iceberg/issues/5958

   ### Query engine
   
   Flink
   
   ### Question
   
   Hello!
   
   I have a Flink application that reads arbitrary AVRO data, map it to RowData 
and uses  several FlinkSink instances to write data into ICEBERG tables. By 
arbitrary data I mean that I have 100 types of AVRO message, all of them with a 
common property "tableName" but containing different columns. I would like to 
write each of these type of message into a separated Iceberg table.
   
   For doing this I'm using side outputs: when I have my data mapped to 
RowData, and I use a ProcessFunction that writes each message into a specific 
OutputTag.
   
   Later on, with the datastream already processed, I loop into the different 
output tags, get the rcords using getSideOutput and  create an specific 
IcebergSink for each of them. Something like:
   
   
   
   ```java
   
           final List<OutputTag<RowData>> tags = ... // list of all possible 
output tags
   
           final DataStream<RowData> rowdata = stream
                   .map(new ToRowDataMap()) // Map Custom Avro Pojo into RowData
                   .uid("map-row-data")
                   .name("Map to RowData")
                   .process(new ProcessRecordFunction(tags)) // process 
elements one by one sending them to a specific OutputTag
                   .uid("id-process-record")
                   .name("Process Input records");;
   
           CatalogLoader catalogLoader = ...
           String upsertField = ...
        
           outputTags
                   .stream()
                   .forEach(tag -> {
                       SingleOutputStreamOperator<RowData> outputStream = stream
                               .getSideOutput(tag)
                               .uid("map-row-data-" + tag)
                               .name("Map Row Data " + tag);
   
                       TableIdentifier identifier = 
TableIdentifier.of("myDBName", tag.getId());
   
                       FlinkSink.Builder builder = FlinkSink
                               .forRowData(outputStream)
                               .table(catalog.loadTable(identifier))
                               
.tableLoader(TableLoader.fromCatalog(catalogLoader, identifier))
                               .set("upsert-enabled", "true")
                               .uidPrefix("commiter-sink-" + tableName)
                               
.equalityFieldColumns(Collections.singletonList(upsertField));
                       builder.append();
                   });
   
   ```
   
   It works very well when I'm dealing with a few tables. But when the number 
of tables scales up, Flink cannot adquire enough task resources to keep the 
application alive and genarates a Flink graph that actually the Flink UI cannot 
show, by needing too many resources....
   
   Is there any other more efficient way of doing this? or maybe any way of 
optimizing it?
   
   Thanks in advance ! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to