waytoharish commented on issue #10895: URL: https://github.com/apache/hudi/issues/10895#issuecomment-2011376344
Thank @danny0405 I am able to map the arraytype using below but not sure if I am doing it correctly private static HoodiePipeline.Builder createHudiPipeline(String targetTable, Map<String, String> options) { return HoodiePipeline.builder(targetTable) .column("uuid VARCHAR(256)") .column("name VARCHAR(10)") .column("age INT") .column("ts TIMESTAMP(3)") .column("data ARRAY<ROW<measure_name VARCHAR(256), type VARCHAR(10)>>") .column("`partition` VARCHAR(20)") .pk("uuid") .partition("partition") .options(options); } // Define the schema for Hudi records public static final DataType ROW_ARRAY_DATA_TYPE = ROW(DataTypes.FIELD("measure_name", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("type", DataTypes.VARCHAR(10))).notNull(); public static final DataType ROW_DATA_TYPE = ROW( DataTypes.FIELD("uuid", DataTypes.VARCHAR(256)), // record key DataTypes.FIELD("name", DataTypes.VARCHAR(10)), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("partition", DataTypes.VARCHAR(10)), DataTypes.FIELD("data",DataTypes.ARRAY(ROW_ARRAY_DATA_TYPE)) ) .notNull(); Now I am trying to figure out how to map it to rowdata where I tried like static class HudiDataSource implements MapFunction<Telemetry, RowData> { @Override public RowData map(Telemetry kafkaRecord) throws Exception { return insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny")) ); } } but its falling with static class HudiDataSource implements MapFunction<Telemetry, RowData> { @Override public RowData map(Telemetry kafkaRecord) throws Exception { return insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1"),insertRow(StringData.fromString(kafkaRecord.getCampaignName()), StringData.fromString("Danny")) ); } } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org