Hi neha

Flink can delete runtime data for a job when it goes to termination. But
for external files such as udf jar files as you mentioned, I think you need
to manage them yourself. The files may be shared between jobs, and can not
be deleted when one flink job exists.

Best,
Shammon FY


On Wed, Apr 19, 2023 at 1:37 PM neha goyal <nehagoy...@gmail.com> wrote:

> Adding to the above query, I have tried dropping the tables and the
> function as well but no luck.
>
> On Wed, Apr 19, 2023 at 11:01 AM neha goyal <nehagoy...@gmail.com> wrote:
>
>> Hello,
>>
>> I am attaching a sample code and screenshot where Flink is holding the
>> reference to a jar file even after I close the streamExecutionEnvironment.
>>
>> Due to this, the deleted file is not getting cleaned up from the disk and
>> we are getting disc space alerts. When we restart our application then
>> these files get cleared from the disk.
>> What is the way to gracefully shut down the Flink environment so that it
>> releases all the resources' references?
>>
>> public class TestResourceRelease {
>>
>>     public void check(){
>>         StreamExecutionEnvironment execEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         try{
>>             StreamTableEnvironment env = 
>> StreamTableEnvironment.create(execEnv);
>>             env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
>> 'com.my.udf.v2.EpochToDate' USING JAR 
>> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>>             TypeInformation[] typeInformationArray = getTypeInfoArray();
>>             String[] columnName = new String[]{"x", "y"};
>>             KafkaSource<Row> source = 
>> KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest())
>>                     .setValueOnlyDeserializer(new 
>> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
>> typeInformationArray)))
>>                     .setProperty("bootstrap.servers", "localhost:9092")
>>                     .setTopics("test").build();
>>
>>             DataStream<Row> stream = execEnv.fromSource(source, 
>> WatermarkStrategy.noWatermarks(), "Kafka Source");
>>             env.registerDataStream("test", stream);
>>
>>             Table table = env.fromDataStream(stream);
>>             env.registerTable("my_test", table);
>>             Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
>> `order_time`, `y` FROM `my_test`");
>>             System.out.println("created the table");
>>         }
>>         catch (Exception e){
>>             System.out.println(e);
>>
>>         }
>>         finally {
>>             try {
>>                 execEnv.close();
>>             } catch (Exception e) {
>>                 e.printStackTrace();
>>             }
>>             File file = new 
>> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>>             file.delete();
>>         }
>>
>>     }
>>
>>     public static TypeInformation[] getTypeInfoArray(){
>>         TypeInformation[] typeInformations = new TypeInformation[2];
>>         typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>>         typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>>         return typeInformations;
>>     }
>>
>> }
>>
>>

Reply via email to