Thanks Timo,
custom function worked for me with no further exceptions,
Thanks.
---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com
*iProgrammer Solutions Pvt. Ltd.*
*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
I tried to reproduce your error but everything worked fine. Which Flink
version are you using?
Inner joins are a Flink 1.5 feature.
Am 27.07.18 um 13:28 schrieb Amol S - iProgrammer:
Table master = table1.filter("ns === 'Master'").select("o as master,
'accessBasicDBObject(applicationId,o)'
Hello Timo,
I have implemented my own scalar function as below
public class AccessBasicDBObject extends ScalarFunction {
public String eval(String key, BasicDBObject basicDBObject) {
if (basicDBObject.getString(key) != null)
return basicDBObject.getString(key);
Hi,
I think the exception is self-explaining. BasicDBObject is not
recognized as a POJO by Flink. A POJO is required such that the Table
API knows the types of fields for following operations.
The easiest way is to implement your own scalar function. E.g. a
`accessBasicDBObject(obj, key)`.
Hello Timo,
Thanks for quick reply. By using your suggestion Previous exception gone
but it is giving me following exception
Expression 'o.get(_id) failed on input check: Cannot access field of
non-composite type 'GenericType'.
---
*Amol Suryawanshi*
Hi Amol,
the dot operation is reserved for calling functions on fields. If you
want to get a nested field in the Table API, use the
`.get("applicationId")` operation. See also [1] under "Value access
functions".
Regards,
Timo
[1]
Hello Fabian,
I am streaming my mongodb oplog using flink and want to use flink table API
to join multiple tables. My code looks like
DataStream streamSource = env
.addSource(kafkaConsumer)
.setParallelism(4);
StreamTableEnvironment tableEnv =