Adding to the above query, I have tried dropping the tables and the
function as well but no luck.

On Wed, Apr 19, 2023 at 11:01 AM neha goyal <nehagoy...@gmail.com> wrote:

> Hello,
>
> I am attaching a sample code and screenshot where Flink is holding the
> reference to a jar file even after I close the streamExecutionEnvironment.
>
> Due to this, the deleted file is not getting cleaned up from the disk and
> we are getting disc space alerts. When we restart our application then
> these files get cleared from the disk.
> What is the way to gracefully shut down the Flink environment so that it
> releases all the resources' references?
>
> public class TestResourceRelease {
>
>     public void check(){
>         StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         try{
>             StreamTableEnvironment env = 
> StreamTableEnvironment.create(execEnv);
>             env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
> 'com.my.udf.v2.EpochToDate' USING JAR 
> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>             TypeInformation[] typeInformationArray = getTypeInfoArray();
>             String[] columnName = new String[]{"x", "y"};
>             KafkaSource<Row> source = 
> KafkaSource.<Row>builder().setStartingOffsets(OffsetsInitializer.latest())
>                     .setValueOnlyDeserializer(new 
> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
> typeInformationArray)))
>                     .setProperty("bootstrap.servers", "localhost:9092")
>                     .setTopics("test").build();
>
>             DataStream<Row> stream = execEnv.fromSource(source, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
>             env.registerDataStream("test", stream);
>
>             Table table = env.fromDataStream(stream);
>             env.registerTable("my_test", table);
>             Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
> `order_time`, `y` FROM `my_test`");
>             System.out.println("created the table");
>         }
>         catch (Exception e){
>             System.out.println(e);
>
>         }
>         finally {
>             try {
>                 execEnv.close();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>             File file = new 
> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>             file.delete();
>         }
>
>     }
>
>     public static TypeInformation[] getTypeInfoArray(){
>         TypeInformation[] typeInformations = new TypeInformation[2];
>         typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>         typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>         return typeInformations;
>     }
>
> }
>
>

Reply via email to