Hi Ronak,

You shared a screenshot of JM. Do you mean that exception also happens
on JM? (I'd rather assume TM).

Could you explain the join clause: left join ccmversionsumapTable cvsm
ON (cdr.version = cvsm.ccmversion)
"version" doesn't sound very selective, so maybe you end up with
(almost) Cartesian product?

Regards,
Roman

On Wed, Jan 12, 2022 at 11:06 AM Ronak Beejawat (rbeejawa)
<rbeej...@cisco.com.invalid> wrote:
>
> Hi Team,
>
> I was trying to implement flink sql api join with 2 tables it is throwing 
> error OutOfMemoryError: Java heap space . PFB screenshot for flink cluster 
> memory details.
> [Flink Memory Model][1]
>
>
>   [1]: https://i.stack.imgur.com/AOnQI.png
>
> **PFB below code snippet which I was trying:**
> ```
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inStreamingMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
>
> tableEnv.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
>  "TWO_PHASE");
> tableEnv.getConfig().getConfiguration().setString("table.optimizer.join-reorder-enabled",
>  "true");
> tableEnv.getConfig().getConfiguration().setString("table.exec.resource.default-parallelism",
>  "16");
>
> tableEnv.executeSql("CREATE TEMPORARY TABLE ccmversionsumapTable (\r\n"
>                                              + "  suname STRING\r\n"
>                                              + "  ,ccmversion STRING\r\n"
>                                              + "       )\r\n"
>                                              + "       WITH (\r\n"
>                                              + "       'connector' = 
> 'jdbc'\r\n"
>                                              + "       ,'url' = 
> 'jdbc:mysql://****:3306/ccucdb'\r\n"
>                                              + "       ,'table-name' = 
> 'ccmversionsumap'\r\n"
>                                              + "       ,'username' = 
> '*****'\r\n"
>                                              + "       ,'password' = 
> '****'\r\n"
>                                              + "       )");
>
> tableEnv.executeSql("CREATE TEMPORARY TABLE cdrTable (\r\n"
>                                + "       org_id STRING\r\n"
>                                + "       ,cluster_id STRING\r\n"
>                                + "       ,cluster_name STRING\r\n"
>                                + "       ,version STRING\r\n"
>                                + "       ,ip_address STRING\r\n"
>                                + "       ,pkid STRING\r\n"
>                                + "       ,globalcallid_callid INT\r\n"
>                                       ... --- multiple columns can be added
>                                + "       )\r\n"
>                                + "       WITH (\r\n"
>                                + "       'connector' = 'kafka'\r\n"
>                                + "       ,'topic' = 'cdr'\r\n"
>                                + "       ,'properties.bootstrap.servers' = 
> '****:9092'\r\n"
>                                + "       ,'scan.startup.mode' = 
> 'earliest-offset'\r\n"
>                                //+ "    ,'value.fields-include' = 
> 'EXCEPT_KEY'\r\n"
>                                + "       ,'format' = 'json'\r\n"
>                                + "       )");
>
>
> String sql = "SELECT cdr.org_id orgid,\r\n"
>                                                           + "         
> cdr.cluster_name clustername,\r\n"
>                                                           + "         
> cdr.cluster_id clusterid,\r\n"
>                                                           + "         
> cdr.ip_address clusteripaddr,\r\n"
>                                                           + "         
> cdr.version clusterversion,\r\n"
>                                                           + "         
> cvsm.suname clustersuname,\r\n"
>                                                           + "         
> cdr.pkid cdrpkid,\r\n"
>                                                               ... --- 
> multiple columns can be added
>                                                           + "         from 
> cdrTable cdr\r\n"
>                                                           + " left join 
> ccmversionsumapTable cvsm ON (cdr.version = cvsm.ccmversion) group by 
> TUMBLE(PROCTIME(), INTERVAL '1' MINUTE), cdr.org_id, cdr.cluster_name, 
> cdr.cluster_id, cdr.ip_address, cdr.version, cdr.pkid, 
> cdr.globalcallid_callid, ..."
>
> Table order20 = tableEnv.sqlQuery(sql);
> order20.executeInsert("outputCdrTable");
> ```
>
> **scenario / use case :**
>
> we are pushing 2.5 million json record in kafka topic and reading it via 
> kafka connector as temporary cdrTable as shown in above code and we reading 
> 23 records from jdbc static/reference table via jdbc connector as temporary 
> ccmversionsumapTable as shown in above code and doing a left join for 1 min 
> tumble window .
>
> So while doing a join we are getting OutOfMemoryError: jvm heap space error 
> while processing it.
>
> but the similar use case we tried to do left join with two tables cdr (2.5m 
> records) and cmr (5m records) with same tumbling window we are able to 
> process that without any issue and both are reading from kafka as shown in 
> above code snnipet for cdrTable
>
> Thanks
> Ronak Beejawat

Reply via email to