Help in designing the Flink usecase
Classification: External Hi, I have a use case involving calculating the lifetime order count of a customer in real-time. To reduce the memory footprint, I plan to run a batch job on stored data every morning (let's say at 5 am) to calculate the total order count up to that moment. Additionally, I aim to deploy a Flink job that can provide the updated order count in real-time after 5 am reading from Kafka in real time. Is it possible for this job to understand both the data from the batch job and the new order count to provide a consistently accurate total count? Please let me know if you have solved this use case earlier or any idea on how to proceed.
Query around Rocksdb
Hello, I am trying to debug the unbounded memory consumption by the Flink process. The heap size of the process remains the same. The size of the RSS of the process keeps on increasing. I suspect it might be because of RocksDB. we have the default value for state.backend.rocksdb.memory.managed as true. Can anyone confirm that this config will Rockdb be able to take the unbounded native memory? If yes, what metrics can I check to confirm the issue? Any help would be appreciated.
Flink Sql erroring at runtime Flink 1.16
Hello, Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or TRIM functions(there might be more such functions), I am getting the exception. These functions used to work fine with Flink 1.13. select if( address_id = 'a', 'default', upper(address_id) ) as address_id from feature_test sample Input sent to my Kafka Source: {"address_id":"mydata"} It should be reproducible. Please try it. 2023-05-05 23:30:24 java.io.IOException: Failed to deserialize consumer record due to at StreamExecCalc$14237.processElement_split1961(Unknown Source) at StreamExecCalc$14237.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 28 more
Re: Flink Job Failure for version 1.16
Hi Everyone, can someone please shade some light when the Checkpoint Coordinator is suspending Error comes and what should I do to avoid this? it is impacting the production pipeline after the version upgrade. It is related to resource crunch in the pipeline? Thank You On Thu, May 11, 2023 at 10:35 AM neha goyal wrote: > I have recently migrated from 1.13.6 to 1.16.1, I can see there is a > performance degradation for the Flink pipeline which is using Flink's > managed state ListState, MapState, etc. Pipelines are frequently failing > with the Exception: > > 06:59:42.021 [Checkpoint Timer] WARN o.a.f.r.c.CheckpointFailureManager - > Failed to trigger or complete checkpoint 36755 for job > d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far) > org.apache.flink.runtime.checkpoint.CheckpointFailureManager > org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException: > Checkpoint expired before completing. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > 07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN > a.remote.ReliableDeliverySupervisor - Association with remote system > [akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367] > has failed, address is now gated for [50] ms. Reason: [Disassociated] > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 > akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN > a.remote.ReliableDeliverySupervisor - Association with remote system > [akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639] > has failed, address is now gated for [50] ms. Reason: [Disassociated] > akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 > akka.remote.ReliableDeliverySupervisor07:18:15.331 > [flink-akka.actor.default-dispatcher-31] WARN > o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete > checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive > failed attempts so far) > org.apache.flink.runtime.checkpoint.CheckpointFailureManager > org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException: > Checkpoint Coordinator is suspending. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161) > > Is there any issue with this Flink version or the new RocksDB version? > What should be the action item for this Exception? > The maximum savepoint size is 80.2 GB and we periodically(every 20 > minutes) take the savepoint for the job. > Checkpoint Type: aligned checkpoint >
Re: Flink Sql erroring at runtime
Hi Hang and community, There is a correction in my earlier email. The issue comes when I use the UPPER or TRIM function with IF. Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or TRIM functions(there might be more such functions), I am getting the exception. These functions used to work fine with Flink 1.13. select if( address_id = 'a', 'default', upper(address_id) ) as address_id from feature_test sample Input sent to my Kafka Source: {"address_id":"mydata"} It should be reproducible. Please try it. 2023-05-05 23:30:24 java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 22 more Caused by: java.lang.NullPointerException at StreamExecCalc$14237.processElement_split1961(Unknown Source) at StreamExecCalc$14237.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 28 more On Mon, May 8, 2023 at 11:48 AM Hang Ruan wrote: > Hi, neha, > > I think the error occurred because of the deserialization. Is there some > example data and runnable SQLs to reproduce the problem? > > Best, > Hang > > neha goyal 于2023年5月2日周二 16:33写道: > >> Hello, >> >> I am using Flink 1.16.1 and observing a different behavior from Flink >> 1.13.6. >>
Flink Job Failure for version 1.16
I have recently migrated from 1.13.6 to 1.16.1, I can see there is a performance degradation for the Flink pipeline which is using Flink's managed state ListState, MapState, etc. Pipelines are frequently failing with the Exception: 06:59:42.021 [Checkpoint Timer] WARN o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete checkpoint 36755 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far) org.apache.flink.runtime.checkpoint.CheckpointFailureManager org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN a.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367] has failed, address is now gated for [50] ms. Reason: [Disassociated] akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN a.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639] has failed, address is now gated for [50] ms. Reason: [Disassociated] akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 akka.remote.ReliableDeliverySupervisor07:18:15.331 [flink-akka.actor.default-dispatcher-31] WARN o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far) org.apache.flink.runtime.checkpoint.CheckpointFailureManager org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926) at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161) Is there any issue with this Flink version or the new RocksDB version? What should be the action item for this Exception? The maximum savepoint size is 80.2 GB and we periodically(every 20 minutes) take the savepoint for the job. Checkpoint Type: aligned checkpoint
Question about Flink metrics
Hello, I have a question about the Prometheus metrics. I am able to fetch the metrics from the following expression. sum(flink_jobmanager_job_numRestarts{job_name="$job_name"}) by (job_name) Now I am interested in only a few jobs and I want to give them a label. How to achieve this? How to give an additional label to Flink Prometheus metrics so that I can fetch the metrics only for those jobs having that label? This tag I need to set on the job level. Few jobs will have that tag and others won't.
Flink Sql erroring at runtime
Hello, I am using Flink 1.16.1 and observing a different behavior from Flink 1.13.6. SELECT if(some_string_field is null, 'default', 'some_string_field') from my_stream This SQL flink job in the streaming environment is erroring out during runtime with the exception mentioned below. There are no null values sent and it is failing for the nonnull values as well. It is running fine in Flink 1.13.6. Also, if I use the Integer field, it runs fine. Was there any change around this in Flink 14/15/16? io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 22 more Caused by: java.lang.NullPointerException at StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown Source) at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source) at StreamExecCalc$53548.processElement_split10047(Unknown Source) at StreamExecCalc$53548.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ... 28 more
Re: Flink not releasing the reference to a deleted jar file
Hi Shammon, The Flink job doesn't exist after I close the execution environment right? Can you please try the attached code and see that I am not sharing the file with any other job? Until I close the running Java application the file still has an open reference in the code mentioned. On Thu, Apr 20, 2023 at 7:25 AM Shammon FY wrote: > Hi neha > > Flink can delete runtime data for a job when it goes to termination. But > for external files such as udf jar files as you mentioned, I think you need > to manage them yourself. The files may be shared between jobs, and can not > be deleted when one flink job exists. > > Best, > Shammon FY > > > On Wed, Apr 19, 2023 at 1:37 PM neha goyal wrote: > >> Adding to the above query, I have tried dropping the tables and the >> function as well but no luck. >> >> On Wed, Apr 19, 2023 at 11:01 AM neha goyal wrote: >> >>> Hello, >>> >>> I am attaching a sample code and screenshot where Flink is holding the >>> reference to a jar file even after I close the streamExecutionEnvironment. >>> >>> Due to this, the deleted file is not getting cleaned up from the disk >>> and we are getting disc space alerts. When we restart our application then >>> these files get cleared from the disk. >>> What is the way to gracefully shut down the Flink environment so that it >>> releases all the resources' references? >>> >>> public class TestResourceRelease { >>> >>> public void check(){ >>> StreamExecutionEnvironment execEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> try{ >>> StreamTableEnvironment env = >>> StreamTableEnvironment.create(execEnv); >>> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS >>> 'com.my.udf.v2.EpochToDate' USING JAR >>> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); >>> TypeInformation[] typeInformationArray = getTypeInfoArray(); >>> String[] columnName = new String[]{"x", "y"}; >>> KafkaSource source = >>> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest()) >>> .setValueOnlyDeserializer(new >>> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, >>> typeInformationArray))) >>> .setProperty("bootstrap.servers", "localhost:9092") >>> .setTopics("test").build(); >>> >>> DataStream stream = execEnv.fromSource(source, >>> WatermarkStrategy.noWatermarks(), "Kafka Source"); >>> env.registerDataStream("test", stream); >>> >>> Table table = env.fromDataStream(stream); >>> env.registerTable("my_test", table); >>> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS >>> `order_time`, `y` FROM `my_test`"); >>> System.out.println("created the table"); >>> } >>> catch (Exception e){ >>> System.out.println(e); >>> >>> } >>> finally { >>> try { >>> execEnv.close(); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> File file = new >>> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); >>> file.delete(); >>> } >>> >>> } >>> >>> public static TypeInformation[] getTypeInfoArray(){ >>> TypeInformation[] typeInformations = new TypeInformation[2]; >>> typeInformations[0] = org.apache.flink.table.api.Types.LONG(); >>> typeInformations[1] = org.apache.flink.table.api.Types.LONG(); >>> return typeInformations; >>> } >>> >>> } >>> >>>
Re: Flink not releasing the reference to a deleted jar file
Adding to the above query, I have tried dropping the tables and the function as well but no luck. On Wed, Apr 19, 2023 at 11:01 AM neha goyal wrote: > Hello, > > I am attaching a sample code and screenshot where Flink is holding the > reference to a jar file even after I close the streamExecutionEnvironment. > > Due to this, the deleted file is not getting cleaned up from the disk and > we are getting disc space alerts. When we restart our application then > these files get cleared from the disk. > What is the way to gracefully shut down the Flink environment so that it > releases all the resources' references? > > public class TestResourceRelease { > > public void check(){ > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > try{ > StreamTableEnvironment env = > StreamTableEnvironment.create(execEnv); > env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS > 'com.my.udf.v2.EpochToDate' USING JAR > 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); > TypeInformation[] typeInformationArray = getTypeInfoArray(); > String[] columnName = new String[]{"x", "y"}; > KafkaSource source = > KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest()) > .setValueOnlyDeserializer(new > JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, > typeInformationArray))) > .setProperty("bootstrap.servers", "localhost:9092") > .setTopics("test").build(); > > DataStream stream = execEnv.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > env.registerDataStream("test", stream); > > Table table = env.fromDataStream(stream); > env.registerTable("my_test", table); > Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS > `order_time`, `y` FROM `my_test`"); > System.out.println("created the table"); > } > catch (Exception e){ > System.out.println(e); > > } > finally { > try { > execEnv.close(); > } catch (Exception e) { > e.printStackTrace(); > } > File file = new > File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); > file.delete(); > } > > } > > public static TypeInformation[] getTypeInfoArray(){ > TypeInformation[] typeInformations = new TypeInformation[2]; > typeInformations[0] = org.apache.flink.table.api.Types.LONG(); > typeInformations[1] = org.apache.flink.table.api.Types.LONG(); > return typeInformations; > } > > } > >
Flink not releasing the reference to a deleted jar file
Hello, I am attaching a sample code and screenshot where Flink is holding the reference to a jar file even after I close the streamExecutionEnvironment. Due to this, the deleted file is not getting cleaned up from the disk and we are getting disc space alerts. When we restart our application then these files get cleared from the disk. What is the way to gracefully shut down the Flink environment so that it releases all the resources' references? public class TestResourceRelease { public void check(){ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); try{ StreamTableEnvironment env = StreamTableEnvironment.create(execEnv); env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 'com.my.udf.v2.EpochToDate' USING JAR 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'"); TypeInformation[] typeInformationArray = getTypeInfoArray(); String[] columnName = new String[]{"x", "y"}; KafkaSource source = KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, typeInformationArray))) .setProperty("bootstrap.servers", "localhost:9092") .setTopics("test").build(); DataStream stream = execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); env.registerDataStream("test", stream); Table table = env.fromDataStream(stream); env.registerTable("my_test", table); Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS `order_time`, `y` FROM `my_test`"); System.out.println("created the table"); } catch (Exception e){ System.out.println(e); } finally { try { execEnv.close(); } catch (Exception e) { e.printStackTrace(); } File file = new File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar"); file.delete(); } } public static TypeInformation[] getTypeInfoArray(){ TypeInformation[] typeInformations = new TypeInformation[2]; typeInformations[0] = org.apache.flink.table.api.Types.LONG(); typeInformations[1] = org.apache.flink.table.api.Types.LONG(); return typeInformations; } }
OOM error behaviour change in newer version
Hello, I want to understand the reason behind the different behavior of Flink-delta pipelines. When I am running a Kafka in and delta out pipeline with fewer resources than needed for a particular pipeline, It fails with an OOM error. In the newer version, it takes around 8 minutes to give the first exception while in the older version, it fails in 2 minutes only. what is the new change which can explain this behavior? we have enabled the retrials, hence using the 'first' term. Pipeline1 Flink version: 1.16.1 Delta connector version: 0.6.0 Time taken for the first OOM error: 8 mins Pipeline2 Flink version: 1.13.6 Delta connector version: 0.5.0 Time taken for the first OOM error: 2 mins
Is there any API method for dynamic loading of the UDF jar
Hello, In Flink 16, CREATE FUNCTION USING JAR functionality has been introduced where we can specify the jar resources and the jar can be located in a remote file system such as hdfs/s3. I don't see an alternative method for the same functionality using the TableEnvironment methods call, for example, createTemporarySystemFunction doesn't take any URI. Will these methods be provided in the future? Is there any difference in performance if we use TableEnvironment method calls vs TableEnvironment.executeSql for the same feature? which one is recommended? Thanks and regards