Hi, JING ZHANG!

Thanks for your many times of help. 


I already try to use COLLECT(ROW(id, name)) and store the result with type 
String(for POC test).  So I try to define an UDF, and the annotation of 
function eval must be defined as "MULTISET<ROW<...>>" as below, otherwise 
exception "..RAW/MAP expected. but MULTISET<ROW<`EXPR$0` STRING NOT NULL, 
`EXPR$1` STRING, `EXPR$2` STRING, `EXPR$3` STRING, `EXPR$4` STRING> NOT NULL> 
NOT NULL passed" thrown.


And i think this way of UDF's annotation maybe not a convenient way for general 
scenario. I can't define many UDFs for all RowData structures, such as Row<id>, 
Row<id, name>, Row<.....>


Is there any way to define the annotation for dynamic RowData structure? 
Thanks for your suggestions again.


Best Regards!


```
def eval(@DataTypeHint("MULTISET<ROW<" +
 "vital_sign_id STRING, cdt_vital_sign_index STRING, " +
 "unit_name STRING, parameter_value STRING, measure_datetime STRING>" +
 ">") data: JMAP[Row, Integer]): String = {
 if (data == null || data.size() == 0) {
 return ""
 }
 data.keySet().toArray().mkString(",")
}
```






在 2021年11月8日 21:26,JING ZHANG<beyond1...@gmail.com> 写道:


Hi Vtygoss,
You could try the following SQL:
```
select COLLECT(ROW(id, name)) as info
from table 
group by ...;
```
In the above sql, the result type of `COLLECT(ROW(id, name))` is MULTISET<ROW>. 
`CollectAggFunction` would store the data in a MapState. key is element type, 
represent the row value. value is Integer type, represents the count of row.


If you need to define a UDF which handles the result from `COLLECT(ROW(id, 
name))`, you could use Map<Row, Integer> as input parameter type.
The following code is a demo. Hope it helps.
tEnv.registerFunction("TestFunc", TestFunc)
tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from table group 
by ...")
....
@SerialVersionUID(1L)
object TestFunc extends ScalarFunction {
 def eval(s: java.util.Map[Row, Integer]): String = s.keySet().mkString("\n")
}
Best regards,
JING ZHANG


vtygoss <vtyg...@126.com> 于2021年11月8日周一 下午7:00写道:

Hi, flink community!


I am working on migrating data production pipeline from SparkSQL to 
FlinkSQL(1.12.0). And i meet a problem about MULTISET<STRUCT<....>>.


```
Spark SQL


select COLLECT_LIST(named_struct('id', id, 'name', name)) as info
from table 
group by ...;


``` 


- 1. how to express and store this data structure in flink? 
I tried to express by MULTISET<ROW<id long, name string>> in FlinkSQL. But it 
seems that ORC / JSON / AVRO format cann't store this type. 
- 2.  How to read MULTISET<Row<id long, name string>> in FlinkSQL?  
If i need to define a function, which type should be briged to for 
MultiSet<Row<id long, name string>>?  


Is there any other way more convenient to solute this problem?   
Thanks very much for your any suggestions or replies. 


Best Regards!

Reply via email to