Hi Ronak,

As mentioned in the Flink Community & Project information [1] the primary
place for support are the mailing lists and user support should go to the
User mailing list. Keep in mind that this is still done by the community
and set up for asynchronous handling. If you want to have quick
acknowledgment or SLAs, there are vendors that can offer commercial support
on Flink.

You can't compare the two statements, because in your first join you're
also applying a TUMBLE. That means that you're not only maintaining state
for your join, but also for your window. You're also using the old Group
Window Aggregation function and it's recommended to use Window TVFs due to
better performance optimizations [2]

Best regards,

Martijn

[1]
https://flink.apache.org/community.html#how-do-i-get-help-from-apache-flink
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation


On Thu, 13 Jan 2022 at 06:33, Ronak Beejawat (rbeejawa) <rbeej...@cisco.com>
wrote:

> HI Martijn,
>
>
>
> I posted the below query both the places(flink mailing list and stack
> overflow) to get a quick response on it.
>
> Please let me know the exact poc / mailing list to post my quries if it is
> causing trouble, so at least we can get quick acknowledgement on the issues
> reported.
>
>
>
> Ok let me ask the below question in a simpler way
>
>
>
> *Join 1 *
>
>
>
> select * from cdrTable left join  ccmversionsumapTable cvsm ON
> (cdrTable.version = ccmversionsumapTable.ccmversion) group by
> TUMBLE(PROCTIME(), INTERVAL '1' MINUTE), …
>
> (2.5 million left join with 23 records it is failing to compute and
> throwing heap error)
>
> Note: This is small join example as compared to Join2 condition as shown
> below. here we are using different connector for reading cdrTable -> kafka
> connector and ccmversionsumapTable -> jdbc connector
>
>
>
> *Join 2*
>
>
>
> select * from cdrTable left join  left join cmrTable cmr on (cdr.org_id =
> cmr.org_id AND cdr.cluster_id = cmr.cluster_id AND
> cdr.globalcallid_callmanagerid = cmr.globalcallid_callmanagerid AND
> cdr.globalcallid_callid = cmr.globalcallid_callid AND
> (cdr.origlegcallidentifier = cmr.callidentifier OR
> cdr.destlegcallidentifier = cmr.callidentifier)), … (2.5 million left join
> with 5 million it is computing properly without any heap error )
>
> Note: This is bigger join example as compared to Join1 condition as shown
> above. here we are using same connector for reading cdrTable , cmrTable ->
> kafka connector
>
>
>
> So the question is with small join condition it is throwing heap error and
> with bigger set of join it is working properly . Please help us on this
>
>
>
> Thanks
>
> Ronak Beejawat
>
>
>
> *From: *Martijn Visser <mart...@ververica.com>
> *Date: *Wednesday, 12 January 2022 at 7:43 PM
> *To: *dev <d...@flink.apache.org>
> *Cc: *commun...@flink.apache.org <commun...@flink.apache.org>,
> user@flink.apache.org <user@flink.apache.org>, Hang Ruan <
> ruanhang1...@gmail.com>, Shrinath Shenoy K (sshenoyk) <sshen...@cisco.com>,
> Jayaprakash Kuravatti (jkuravat) <jkura...@cisco.com>, Krishna Singitam
> (ksingita) <ksing...@cisco.com>, Nabhonil Sinha (nasinha) <
> nasi...@cisco.com>, Vibhor Jain (vibhjain) <vibhj...@cisco.com>,
> Raghavendra Jsv (rjsv) <r...@cisco.com>, Arun Yadav (aruny) <
> ar...@cisco.com>, Avi Sanwal (asanwal) <asan...@cisco.com>
> *Subject: *Re: OutOfMemoryError: Java heap space while implmentating
> flink sql api
>
> Hi Ronak,
>
>
>
> I would like to ask you to stop cross-posting to all the Flink mailing
> lists and then also post the same question to Stackoverflow. Both the
> mailing lists and Stackoverflow are designed for asynchronous communication
> and you should allow the community some days to address your question.
>
>
>
> Joins are state heavy. As mentioned in the documentation [1] "Thus, the
> required state for computing the query result might grow infinitely
> depending on the number of distinct input rows of all input tables and
> intermediate join results."
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/
>
>
>
>
>
> On Wed, 12 Jan 2022 at 11:06, Ronak Beejawat (rbeejawa) <
> rbeej...@cisco.com.invalid> wrote:
>
> Hi Team,
>
> I was trying to implement flink sql api join with 2 tables it is throwing
> error OutOfMemoryError: Java heap space . PFB screenshot for flink cluster
> memory details.
> [Flink Memory Model][1]
>
>
>   [1]: https://i.stack.imgur.com/AOnQI.png
>
> **PFB below code snippet which I was trying:**
> ```
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
>
> tableEnv.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
> "TWO_PHASE");
> tableEnv.getConfig().getConfiguration().setString("table.optimizer.join-reorder-enabled",
> "true");
> tableEnv.getConfig().getConfiguration().setString("table.exec.resource.default-parallelism",
> "16");
>
> tableEnv.executeSql("CREATE TEMPORARY TABLE ccmversionsumapTable (\r\n"
>                                              + "  suname STRING\r\n"
>                                              + "  ,ccmversion STRING\r\n"
>                                              + "       )\r\n"
>                                              + "       WITH (\r\n"
>                                              + "       'connector' =
> 'jdbc'\r\n"
>                                              + "       ,'url' =
> 'jdbc:mysql://****:3306/ccucdb'\r\n"
>                                              + "       ,'table-name' =
> 'ccmversionsumap'\r\n"
>                                              + "       ,'username' =
> '*****'\r\n"
>                                              + "       ,'password' =
> '****'\r\n"
>                                              + "       )");
>
> tableEnv.executeSql("CREATE TEMPORARY TABLE cdrTable (\r\n"
>                                + "       org_id STRING\r\n"
>                                + "       ,cluster_id STRING\r\n"
>                                + "       ,cluster_name STRING\r\n"
>                                + "       ,version STRING\r\n"
>                                + "       ,ip_address STRING\r\n"
>                                + "       ,pkid STRING\r\n"
>                                + "       ,globalcallid_callid INT\r\n"
>                                       ... --- multiple columns can be added
>                                + "       )\r\n"
>                                + "       WITH (\r\n"
>                                + "       'connector' = 'kafka'\r\n"
>                                + "       ,'topic' = 'cdr'\r\n"
>                                + "       ,'properties.bootstrap.servers' =
> '****:9092'\r\n"
>                                + "       ,'scan.startup.mode' =
> 'earliest-offset'\r\n"
>                                //+ "    ,'value.fields-include' =
> 'EXCEPT_KEY'\r\n"
>                                + "       ,'format' = 'json'\r\n"
>                                + "       )");
>
>
> String sql = "SELECT cdr.org_id orgid,\r\n"
>                                                           + "
>  cdr.cluster_name clustername,\r\n"
>                                                           + "
>  cdr.cluster_id clusterid,\r\n"
>                                                           + "
>  cdr.ip_address clusteripaddr,\r\n"
>                                                           + "
>  cdr.version clusterversion,\r\n"
>                                                           + "
>  cvsm.suname clustersuname,\r\n"
>                                                           + "
>  cdr.pkid cdrpkid,\r\n"
>                                                               ... ---
> multiple columns can be added
>                                                           + "         from
> cdrTable cdr\r\n"
>                                                           + " left join
> ccmversionsumapTable cvsm ON (cdr.version = cvsm.ccmversion) group by
> TUMBLE(PROCTIME(), INTERVAL '1' MINUTE), cdr.org_id, cdr.cluster_name,
> cdr.cluster_id, cdr.ip_address, cdr.version, cdr.pkid,
> cdr.globalcallid_callid, ..."
>
> Table order20 = tableEnv.sqlQuery(sql);
> order20.executeInsert("outputCdrTable");
> ```
>
> **scenario / use case :**
>
> we are pushing 2.5 million json record in kafka topic and reading it via
> kafka connector as temporary cdrTable as shown in above code and we reading
> 23 records from jdbc static/reference table via jdbc connector as temporary
> ccmversionsumapTable as shown in above code and doing a left join for 1 min
> tumble window .
>
> So while doing a join we are getting OutOfMemoryError: jvm heap space
> error while processing it.
>
> but the similar use case we tried to do left join with two tables cdr
> (2.5m records) and cmr (5m records) with same tumbling window we are able
> to process that without any issue and both are reading from kafka as shown
> in above code snnipet for cdrTable
>
> Thanks
> Ronak Beejawat
>
>

Reply via email to