Hi Team,
I have a flink job that read tranasctions data from a single source and creates
iceberg tables. I need to create multiple datasets for each account in the list
of transactions. The IcebergTableSink accepts the TableLoader on the
constructor, but my table name is based on the account Id on each transaction
and the account Ids is not a predefined list. This means I have to decide which
table to load data into while writing the code, but I want it to be dynamic
based on the account Id. Is there a better way to handle it? Here's the job I
have to create one iceberg table from the job. Appreciate any help here.
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<RowData> rows = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
TableSchema ts = TableSchema.builder()
.field("AccountId", DataTypes.BIGINT())
.field("Timestamp", DataTypes.BIGINT())
.field("Amount", DataTypes.DOUBLE())
.build();
// TODO: need to write to multiple tables here based on
Transaction::getAccountId
String tablelocation = "./data/flinklocal/transactions5";
TableLoader tl = TableLoader.fromHadoopTable(tablelocation,
hadoopConf);
IcebergTableSink sink = new IcebergTableSink(false, tl, ts);
sink.consumeDataStream(rows);
env.execute("Multiple transactional datasets”);
}
Thanks,
Asha Desu