Re: Flink not releasing the reference to a deleted jar file

2023-04-19 Thread neha goyal
Hi Shammon,

The Flink job doesn't exist after I close the execution environment right?
Can you please try the attached code and see that I am not sharing the file
with any other job? Until I close the running Java application the file
still has an open reference in the code mentioned.

On Thu, Apr 20, 2023 at 7:25 AM Shammon FY  wrote:

> Hi neha
>
> Flink can delete runtime data for a job when it goes to termination. But
> for external files such as udf jar files as you mentioned, I think you need
> to manage them yourself. The files may be shared between jobs, and can not
> be deleted when one flink job exists.
>
> Best,
> Shammon FY
>
>
> On Wed, Apr 19, 2023 at 1:37 PM neha goyal  wrote:
>
>> Adding to the above query, I have tried dropping the tables and the
>> function as well but no luck.
>>
>> On Wed, Apr 19, 2023 at 11:01 AM neha goyal  wrote:
>>
>>> Hello,
>>>
>>> I am attaching a sample code and screenshot where Flink is holding the
>>> reference to a jar file even after I close the streamExecutionEnvironment.
>>>
>>> Due to this, the deleted file is not getting cleaned up from the disk
>>> and we are getting disc space alerts. When we restart our application then
>>> these files get cleared from the disk.
>>> What is the way to gracefully shut down the Flink environment so that it
>>> releases all the resources' references?
>>>
>>> public class TestResourceRelease {
>>>
>>> public void check(){
>>> StreamExecutionEnvironment execEnv = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> try{
>>> StreamTableEnvironment env = 
>>> StreamTableEnvironment.create(execEnv);
>>> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
>>> 'com.my.udf.v2.EpochToDate' USING JAR 
>>> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>>> TypeInformation[] typeInformationArray = getTypeInfoArray();
>>> String[] columnName = new String[]{"x", "y"};
>>> KafkaSource source = 
>>> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
>>> .setValueOnlyDeserializer(new 
>>> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
>>> typeInformationArray)))
>>> .setProperty("bootstrap.servers", "localhost:9092")
>>> .setTopics("test").build();
>>>
>>> DataStream stream = execEnv.fromSource(source, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source");
>>> env.registerDataStream("test", stream);
>>>
>>> Table table = env.fromDataStream(stream);
>>> env.registerTable("my_test", table);
>>> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
>>> `order_time`, `y` FROM `my_test`");
>>> System.out.println("created the table");
>>> }
>>> catch (Exception e){
>>> System.out.println(e);
>>>
>>> }
>>> finally {
>>> try {
>>> execEnv.close();
>>> } catch (Exception e) {
>>> e.printStackTrace();
>>> }
>>> File file = new 
>>> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>>> file.delete();
>>> }
>>>
>>> }
>>>
>>> public static TypeInformation[] getTypeInfoArray(){
>>> TypeInformation[] typeInformations = new TypeInformation[2];
>>> typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>>> typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>>> return typeInformations;
>>> }
>>>
>>> }
>>>
>>>


Re: Flink not releasing the reference to a deleted jar file

2023-04-19 Thread Shammon FY
Hi neha

Flink can delete runtime data for a job when it goes to termination. But
for external files such as udf jar files as you mentioned, I think you need
to manage them yourself. The files may be shared between jobs, and can not
be deleted when one flink job exists.

Best,
Shammon FY


On Wed, Apr 19, 2023 at 1:37 PM neha goyal  wrote:

> Adding to the above query, I have tried dropping the tables and the
> function as well but no luck.
>
> On Wed, Apr 19, 2023 at 11:01 AM neha goyal  wrote:
>
>> Hello,
>>
>> I am attaching a sample code and screenshot where Flink is holding the
>> reference to a jar file even after I close the streamExecutionEnvironment.
>>
>> Due to this, the deleted file is not getting cleaned up from the disk and
>> we are getting disc space alerts. When we restart our application then
>> these files get cleared from the disk.
>> What is the way to gracefully shut down the Flink environment so that it
>> releases all the resources' references?
>>
>> public class TestResourceRelease {
>>
>> public void check(){
>> StreamExecutionEnvironment execEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> try{
>> StreamTableEnvironment env = 
>> StreamTableEnvironment.create(execEnv);
>> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
>> 'com.my.udf.v2.EpochToDate' USING JAR 
>> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>> TypeInformation[] typeInformationArray = getTypeInfoArray();
>> String[] columnName = new String[]{"x", "y"};
>> KafkaSource source = 
>> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
>> .setValueOnlyDeserializer(new 
>> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
>> typeInformationArray)))
>> .setProperty("bootstrap.servers", "localhost:9092")
>> .setTopics("test").build();
>>
>> DataStream stream = execEnv.fromSource(source, 
>> WatermarkStrategy.noWatermarks(), "Kafka Source");
>> env.registerDataStream("test", stream);
>>
>> Table table = env.fromDataStream(stream);
>> env.registerTable("my_test", table);
>> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
>> `order_time`, `y` FROM `my_test`");
>> System.out.println("created the table");
>> }
>> catch (Exception e){
>> System.out.println(e);
>>
>> }
>> finally {
>> try {
>> execEnv.close();
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>> File file = new 
>> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>> file.delete();
>> }
>>
>> }
>>
>> public static TypeInformation[] getTypeInfoArray(){
>> TypeInformation[] typeInformations = new TypeInformation[2];
>> typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>> typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>> return typeInformations;
>> }
>>
>> }
>>
>>


Re: Flink not releasing the reference to a deleted jar file

2023-04-18 Thread neha goyal
Adding to the above query, I have tried dropping the tables and the
function as well but no luck.

On Wed, Apr 19, 2023 at 11:01 AM neha goyal  wrote:

> Hello,
>
> I am attaching a sample code and screenshot where Flink is holding the
> reference to a jar file even after I close the streamExecutionEnvironment.
>
> Due to this, the deleted file is not getting cleaned up from the disk and
> we are getting disc space alerts. When we restart our application then
> these files get cleared from the disk.
> What is the way to gracefully shut down the Flink environment so that it
> releases all the resources' references?
>
> public class TestResourceRelease {
>
> public void check(){
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> try{
> StreamTableEnvironment env = 
> StreamTableEnvironment.create(execEnv);
> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
> 'com.my.udf.v2.EpochToDate' USING JAR 
> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
> TypeInformation[] typeInformationArray = getTypeInfoArray();
> String[] columnName = new String[]{"x", "y"};
> KafkaSource source = 
> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
> .setValueOnlyDeserializer(new 
> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
> typeInformationArray)))
> .setProperty("bootstrap.servers", "localhost:9092")
> .setTopics("test").build();
>
> DataStream stream = execEnv.fromSource(source, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> env.registerDataStream("test", stream);
>
> Table table = env.fromDataStream(stream);
> env.registerTable("my_test", table);
> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
> `order_time`, `y` FROM `my_test`");
> System.out.println("created the table");
> }
> catch (Exception e){
> System.out.println(e);
>
> }
> finally {
> try {
> execEnv.close();
> } catch (Exception e) {
> e.printStackTrace();
> }
> File file = new 
> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
> file.delete();
> }
>
> }
>
> public static TypeInformation[] getTypeInfoArray(){
> TypeInformation[] typeInformations = new TypeInformation[2];
> typeInformations[0] = org.apache.flink.table.api.Types.LONG();
> typeInformations[1] = org.apache.flink.table.api.Types.LONG();
> return typeInformations;
> }
>
> }
>
>