Hi,
I think the exception is self-explaining. BasicDBObject is not
recognized as a POJO by Flink. A POJO is required such that the Table
API knows the types of fields for following operations.
The easiest way is to implement your own scalar function. E.g. a
`accessBasicDBObject(obj, key)`.
Regards,
Timo
Am 27.07.18 um 11:25 schrieb Amol S - iProgrammer:
Hello Timo,
Thanks for quick reply. By using your suggestion Previous exception gone
but it is giving me following exception
Expression 'o.get(_id) failed on input check: Cannot access field of
non-composite type 'GenericType<com.mongodb.BasicDBObject>'.
-----------------------------------------------
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com
*iProgrammer Solutions Pvt. Ltd.*
*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sac...@iprogrammer.com>
------------------------------------------------
On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <twal...@apache.org> wrote:
Hi Amol,
the dot operation is reserved for calling functions on fields. If you want
to get a nested field in the Table API, use the `.get("applicationId")`
operation. See also [1] under "Value access functions".
Regards,
Timo
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/
dev/table/tableApi.html#built-in-functions
Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer:
Hello Fabian,
I am streaming my mongodb oplog using flink and want to use flink table
API
to join multiple tables. My code looks like
DataStream<Oplog> streamSource = env
.addSource(kafkaConsumer)
.setParallelism(4);
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
onment(env);
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(streamSource);
Table master = table1.filter("ns === 'Master'").select("o as master,
o.applicationId as primaryKey");
Table child1 = table1.filter("ns === 'Child1'").select("o as child1,
o.applicationId as foreignKey");
Table child2 = table1.filter("ns === 'Child2'").select("o as child2,
o.applicationId as foreignKey2");
Table result = master.join(child1).where("pri
maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");
it is throwing error "Method threw
'org.apache.flink.table.api.ValidationException' exception. Undefined
function: APPLICATIONID"
public class Oplog implements Serializable{
private BasicDBObject o;
}
Where o is generic java type for fetching mongodb oplog and I can not
replace this generic type with static pojo's. please tell me any work
around on this.
BasicDBObject suffice following two rules
-
The class must be public.
-
It must have a public constructor without arguments (default
constructor)
and we can access class members through basicDBObject.getString("abc")
-----------------------------------------------
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com
*iProgrammer Solutions Pvt. Ltd.*
*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sac...@iprogrammer.com>
------------------------------------------------