Re: Access generic pojo fields

2018-07-27 Thread Amol S - iProgrammer
Thanks Timo, custom function worked for me with no further exceptions, Thanks. --- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,

Re: Access generic pojo fields

2018-07-27 Thread Timo Walther
I tried to reproduce your error but everything worked fine. Which Flink version are you using? Inner joins are a Flink 1.5 feature. Am 27.07.18 um 13:28 schrieb Amol S - iProgrammer: Table master = table1.filter("ns === 'Master'").select("o as master, 'accessBasicDBObject(applicationId,o)'

Re: Access generic pojo fields

2018-07-27 Thread Amol S - iProgrammer
Hello Timo, I have implemented my own scalar function as below public class AccessBasicDBObject extends ScalarFunction { public String eval(String key, BasicDBObject basicDBObject) { if (basicDBObject.getString(key) != null) return basicDBObject.getString(key);

Re: Access generic pojo fields

2018-07-27 Thread Timo Walther
Hi, I think the exception is self-explaining. BasicDBObject is not recognized as a POJO by Flink. A POJO is required such that the Table API knows the types of fields for following operations. The easiest way is to implement your own scalar function. E.g. a `accessBasicDBObject(obj, key)`.

Re: Access generic pojo fields

2018-07-27 Thread Amol S - iProgrammer
Hello Timo, Thanks for quick reply. By using your suggestion Previous exception gone but it is giving me following exception Expression 'o.get(_id) failed on input check: Cannot access field of non-composite type 'GenericType'. --- *Amol Suryawanshi*

Re: Access generic pojo fields

2018-07-27 Thread Timo Walther
Hi Amol, the dot operation is reserved for calling functions on fields. If you want to get a nested field in the Table API, use the `.get("applicationId")` operation. See also [1] under "Value access functions". Regards, Timo [1]

Access generic pojo fields

2018-07-27 Thread Amol S - iProgrammer
Hello Fabian, I am streaming my mongodb oplog using flink and want to use flink table API to join multiple tables. My code looks like DataStream streamSource = env .addSource(kafkaConsumer) .setParallelism(4); StreamTableEnvironment tableEnv =