Re: Flink not releasing the reference to a deleted jar file
Hi Shammon, The Flink job doesn't exist after I close the execution environment right? Can you please try the attached code and see that I am not sharing the file with any other job? Until I close the running Java application the file still has an open reference in the code mentioned. On Thu, Apr 20, 2023 at 7:25 AM Shammon FY wrote: > Hi neha > > Flink can delete runtime data for a job when it goes to termination. But > for external files such as udf jar files as you mentioned, I think you need > to manage them yourself. The files may be shared between jobs, and can not > be deleted when one flink job exists. > > Best, > Shammon FY > > > On Wed, Apr 19, 2023 at 1:37 PM neha goyal wrote: > >> Adding to the above query, I have tried dropping the tables and the >> function as well but no luck. >> >> On Wed, Apr 19, 2023 at 11:01 AM neha goyal wrote: >> >>> Hello, >>> >>> I am attaching a sample code and screenshot where Flink is holding the >>> reference to a jar file even after I close the streamExecutionEnvironment. >>> >>> Due to this, the deleted file is not getting cleaned up from the disk >>> and we are getting disc space alerts. When we restart our application then >>> these files get cleared from the disk. >>> What is the way to gracefully shut down the Flink environment so that it >>> releases all the resources' references? >>> >>> public class TestResourceRelease { >>> >>> public void check(){ >>> StreamExecutionEnvironment execEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> try{ >>> StreamTableEnvironment env = >>> StreamTableEnvironment.create(execEnv); >>> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS >>> 'com.my.udf.v2.EpochToDate' USING JAR >>> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); >>> TypeInformation[] typeInformationArray = getTypeInfoArray(); >>> String[] columnName = new String[]{"x", "y"}; >>> KafkaSource source = >>> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest()) >>> .setValueOnlyDeserializer(new >>> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, >>> typeInformationArray))) >>> .setProperty("bootstrap.servers", "localhost:9092") >>> .setTopics("test").build(); >>> >>> DataStream stream = execEnv.fromSource(source, >>> WatermarkStrategy.noWatermarks(), "Kafka Source"); >>> env.registerDataStream("test", stream); >>> >>> Table table = env.fromDataStream(stream); >>> env.registerTable("my_test", table); >>> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS >>> `order_time`, `y` FROM `my_test`"); >>> System.out.println("created the table"); >>> } >>> catch (Exception e){ >>> System.out.println(e); >>> >>> } >>> finally { >>> try { >>> execEnv.close(); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> File file = new >>> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); >>> file.delete(); >>> } >>> >>> } >>> >>> public static TypeInformation[] getTypeInfoArray(){ >>> TypeInformation[] typeInformations = new TypeInformation[2]; >>> typeInformations[0] = org.apache.flink.table.api.Types.LONG(); >>> typeInformations[1] = org.apache.flink.table.api.Types.LONG(); >>> return typeInformations; >>> } >>> >>> } >>> >>>
Re: Flink not releasing the reference to a deleted jar file
Hi neha Flink can delete runtime data for a job when it goes to termination. But for external files such as udf jar files as you mentioned, I think you need to manage them yourself. The files may be shared between jobs, and can not be deleted when one flink job exists. Best, Shammon FY On Wed, Apr 19, 2023 at 1:37 PM neha goyal wrote: > Adding to the above query, I have tried dropping the tables and the > function as well but no luck. > > On Wed, Apr 19, 2023 at 11:01 AM neha goyal wrote: > >> Hello, >> >> I am attaching a sample code and screenshot where Flink is holding the >> reference to a jar file even after I close the streamExecutionEnvironment. >> >> Due to this, the deleted file is not getting cleaned up from the disk and >> we are getting disc space alerts. When we restart our application then >> these files get cleared from the disk. >> What is the way to gracefully shut down the Flink environment so that it >> releases all the resources' references? >> >> public class TestResourceRelease { >> >> public void check(){ >> StreamExecutionEnvironment execEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> try{ >> StreamTableEnvironment env = >> StreamTableEnvironment.create(execEnv); >> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS >> 'com.my.udf.v2.EpochToDate' USING JAR >> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); >> TypeInformation[] typeInformationArray = getTypeInfoArray(); >> String[] columnName = new String[]{"x", "y"}; >> KafkaSource source = >> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest()) >> .setValueOnlyDeserializer(new >> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, >> typeInformationArray))) >> .setProperty("bootstrap.servers", "localhost:9092") >> .setTopics("test").build(); >> >> DataStream stream = execEnv.fromSource(source, >> WatermarkStrategy.noWatermarks(), "Kafka Source"); >> env.registerDataStream("test", stream); >> >> Table table = env.fromDataStream(stream); >> env.registerTable("my_test", table); >> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS >> `order_time`, `y` FROM `my_test`"); >> System.out.println("created the table"); >> } >> catch (Exception e){ >> System.out.println(e); >> >> } >> finally { >> try { >> execEnv.close(); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> File file = new >> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); >> file.delete(); >> } >> >> } >> >> public static TypeInformation[] getTypeInfoArray(){ >> TypeInformation[] typeInformations = new TypeInformation[2]; >> typeInformations[0] = org.apache.flink.table.api.Types.LONG(); >> typeInformations[1] = org.apache.flink.table.api.Types.LONG(); >> return typeInformations; >> } >> >> } >> >>
Re: Flink not releasing the reference to a deleted jar file
Adding to the above query, I have tried dropping the tables and the function as well but no luck. On Wed, Apr 19, 2023 at 11:01 AM neha goyal wrote: > Hello, > > I am attaching a sample code and screenshot where Flink is holding the > reference to a jar file even after I close the streamExecutionEnvironment. > > Due to this, the deleted file is not getting cleaned up from the disk and > we are getting disc space alerts. When we restart our application then > these files get cleared from the disk. > What is the way to gracefully shut down the Flink environment so that it > releases all the resources' references? > > public class TestResourceRelease { > > public void check(){ > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > try{ > StreamTableEnvironment env = > StreamTableEnvironment.create(execEnv); > env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS > 'com.my.udf.v2.EpochToDate' USING JAR > 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); > TypeInformation[] typeInformationArray = getTypeInfoArray(); > String[] columnName = new String[]{"x", "y"}; > KafkaSource source = > KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest()) > .setValueOnlyDeserializer(new > JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, > typeInformationArray))) > .setProperty("bootstrap.servers", "localhost:9092") > .setTopics("test").build(); > > DataStream stream = execEnv.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > env.registerDataStream("test", stream); > > Table table = env.fromDataStream(stream); > env.registerTable("my_test", table); > Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS > `order_time`, `y` FROM `my_test`"); > System.out.println("created the table"); > } > catch (Exception e){ > System.out.println(e); > > } > finally { > try { > execEnv.close(); > } catch (Exception e) { > e.printStackTrace(); > } > File file = new > File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); > file.delete(); > } > > } > > public static TypeInformation[] getTypeInfoArray(){ > TypeInformation[] typeInformations = new TypeInformation[2]; > typeInformations[0] = org.apache.flink.table.api.Types.LONG(); > typeInformations[1] = org.apache.flink.table.api.Types.LONG(); > return typeInformations; > } > > } > >