Hi Ronak, You shared a screenshot of JM. Do you mean that exception also happens on JM? (I'd rather assume TM).
Could you explain the join clause: left join ccmversionsumapTable cvsm ON (cdr.version = cvsm.ccmversion) "version" doesn't sound very selective, so maybe you end up with (almost) Cartesian product? Regards, Roman On Wed, Jan 12, 2022 at 11:06 AM Ronak Beejawat (rbeejawa) <rbeej...@cisco.com.invalid> wrote: > > Hi Team, > > I was trying to implement flink sql api join with 2 tables it is throwing > error OutOfMemoryError: Java heap space . PFB screenshot for flink cluster > memory details. > [Flink Memory Model][1] > > > [1]: https://i.stack.imgur.com/AOnQI.png > > **PFB below code snippet which I was trying:** > ``` > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > > > tableEnv.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "TWO_PHASE"); > tableEnv.getConfig().getConfiguration().setString("table.optimizer.join-reorder-enabled", > "true"); > tableEnv.getConfig().getConfiguration().setString("table.exec.resource.default-parallelism", > "16"); > > tableEnv.executeSql("CREATE TEMPORARY TABLE ccmversionsumapTable (\r\n" > + " suname STRING\r\n" > + " ,ccmversion STRING\r\n" > + " )\r\n" > + " WITH (\r\n" > + " 'connector' = > 'jdbc'\r\n" > + " ,'url' = > 'jdbc:mysql://****:3306/ccucdb'\r\n" > + " ,'table-name' = > 'ccmversionsumap'\r\n" > + " ,'username' = > '*****'\r\n" > + " ,'password' = > '****'\r\n" > + " )"); > > tableEnv.executeSql("CREATE TEMPORARY TABLE cdrTable (\r\n" > + " org_id STRING\r\n" > + " ,cluster_id STRING\r\n" > + " ,cluster_name STRING\r\n" > + " ,version STRING\r\n" > + " ,ip_address STRING\r\n" > + " ,pkid STRING\r\n" > + " ,globalcallid_callid INT\r\n" > ... --- multiple columns can be added > + " )\r\n" > + " WITH (\r\n" > + " 'connector' = 'kafka'\r\n" > + " ,'topic' = 'cdr'\r\n" > + " ,'properties.bootstrap.servers' = > '****:9092'\r\n" > + " ,'scan.startup.mode' = > 'earliest-offset'\r\n" > //+ " ,'value.fields-include' = > 'EXCEPT_KEY'\r\n" > + " ,'format' = 'json'\r\n" > + " )"); > > > String sql = "SELECT cdr.org_id orgid,\r\n" > + " > cdr.cluster_name clustername,\r\n" > + " > cdr.cluster_id clusterid,\r\n" > + " > cdr.ip_address clusteripaddr,\r\n" > + " > cdr.version clusterversion,\r\n" > + " > cvsm.suname clustersuname,\r\n" > + " > cdr.pkid cdrpkid,\r\n" > ... --- > multiple columns can be added > + " from > cdrTable cdr\r\n" > + " left join > ccmversionsumapTable cvsm ON (cdr.version = cvsm.ccmversion) group by > TUMBLE(PROCTIME(), INTERVAL '1' MINUTE), cdr.org_id, cdr.cluster_name, > cdr.cluster_id, cdr.ip_address, cdr.version, cdr.pkid, > cdr.globalcallid_callid, ..." > > Table order20 = tableEnv.sqlQuery(sql); > order20.executeInsert("outputCdrTable"); > ``` > > **scenario / use case :** > > we are pushing 2.5 million json record in kafka topic and reading it via > kafka connector as temporary cdrTable as shown in above code and we reading > 23 records from jdbc static/reference table via jdbc connector as temporary > ccmversionsumapTable as shown in above code and doing a left join for 1 min > tumble window . > > So while doing a join we are getting OutOfMemoryError: jvm heap space error > while processing it. > > but the similar use case we tried to do left join with two tables cdr (2.5m > records) and cmr (5m records) with same tumbling window we are able to > process that without any issue and both are reading from kafka as shown in > above code snnipet for cdrTable > > Thanks > Ronak Beejawat