Help in designing the Flink usecase

2024-02-20 Thread neha goyal
Classification: External

Hi,

I have a use case involving calculating the lifetime order count of a
customer in real-time. To reduce the memory footprint, I plan to run a
batch job on stored data every morning (let's say at 5 am) to calculate the
total order count up to that moment. Additionally, I aim to deploy a Flink
job that can provide the updated order count in real-time after 5 am
reading from Kafka in real time. Is it possible for this job to understand
both the data from the batch job and the new order count to provide a
consistently accurate total count?

Please let me know if you have solved this use case earlier or any idea on
how to proceed.


Query around Rocksdb

2023-06-30 Thread neha goyal
Hello,

I am trying to debug the unbounded memory consumption by the Flink process.
The heap size of the process remains the same. The size of the RSS of the
process keeps on increasing. I suspect it might be because of RocksDB.

we have the default value for state.backend.rocksdb.memory.managed as true.
Can anyone confirm that this config will Rockdb be able to take the
unbounded native memory?

If yes, what metrics can I check to confirm the issue? Any help would be
appreciated.


Flink Sql erroring at runtime Flink 1.16

2023-05-17 Thread neha goyal
Hello,

Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or
TRIM functions(there might be more such functions), I am getting the
exception. These functions used to work fine with Flink 1.13.
select
  if(
address_id = 'a',
'default',
upper(address_id)
  ) as address_id
from
  feature_test

sample Input sent to my Kafka Source:   {"address_id":"mydata"}
It should be reproducible. Please try it.

2023-05-05 23:30:24
java.io.IOException: Failed to deserialize consumer record due to
at StreamExecCalc$14237.processElement_split1961(Unknown Source)
at StreamExecCalc$14237.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 28 more


Re: Flink Job Failure for version 1.16

2023-05-12 Thread neha goyal
Hi Everyone, can someone please shade some light when the Checkpoint
Coordinator is suspending Error comes and what should I do to avoid this?
it is impacting the production pipeline after the version upgrade. It is
related to resource crunch in the pipeline?
Thank You

On Thu, May 11, 2023 at 10:35 AM neha goyal  wrote:

> I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
> performance degradation for the Flink pipeline which is using Flink's
> managed state ListState, MapState, etc. Pipelines are frequently failing
> with the Exception:
>
> 06:59:42.021 [Checkpoint Timer] WARN  o.a.f.r.c.CheckpointFailureManager -
> Failed to trigger or complete checkpoint 36755 for job
> d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far)
>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
> Checkpoint expired before completing.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> 07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN
>  a.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367]
> has failed, address is now gated for [50] ms. Reason: [Disassociated]
>  akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
> akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN
>  a.remote.ReliableDeliverySupervisor - Association with remote system
> [akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639]
> has failed, address is now gated for [50] ms. Reason: [Disassociated]
>  akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
> akka.remote.ReliableDeliverySupervisor07:18:15.331
> [flink-akka.actor.default-dispatcher-31] WARN
>  o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete
> checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive
> failed attempts so far)
>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
> Checkpoint Coordinator is suspending.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566)
> at
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161)
>
> Is there any issue with this Flink version or the new RocksDB version?
> What should be the action item for this Exception?
> The maximum savepoint size is 80.2 GB and we periodically(every 20
> minutes) take the savepoint for the job.
> Checkpoint Type: aligned checkpoint
>


Re: Flink Sql erroring at runtime

2023-05-11 Thread neha goyal
Hi Hang and community,
There is a correction in my earlier email. The issue comes when I use the
UPPER or TRIM function with IF.
Looks like there is a bug with Flink 1.16's IF operator. If I use UPPER or
TRIM functions(there might be more such functions), I am getting the
exception. These functions used to work fine with Flink 1.13.
select
  if(
address_id = 'a',
'default',
upper(address_id)
  ) as address_id
from
  feature_test

sample Input sent to my Kafka Source:   {"address_id":"mydata"}
It should be reproducible. Please try it.

2023-05-05 23:30:24
java.io.IOException: Failed to deserialize consumer record due to
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io
.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 22 more
Caused by: java.lang.NullPointerException
at StreamExecCalc$14237.processElement_split1961(Unknown Source)
at StreamExecCalc$14237.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 28 more


On Mon, May 8, 2023 at 11:48 AM Hang Ruan  wrote:

> Hi, neha,
>
> I think the error occurred because of the deserialization. Is there some
> example data and runnable SQLs to reproduce the problem?
>
> Best,
> Hang
>
> neha goyal  于2023年5月2日周二 16:33写道:
>
>> Hello,
>>
>> I am using Flink 1.16.1 and observing a different behavior from Flink
>> 1.13.6.
>>

Flink Job Failure for version 1.16

2023-05-10 Thread neha goyal
I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
performance degradation for the Flink pipeline which is using Flink's
managed state ListState, MapState, etc. Pipelines are frequently failing
with the Exception:

06:59:42.021 [Checkpoint Timer] WARN  o.a.f.r.c.CheckpointFailureManager -
Failed to trigger or complete checkpoint 36755 for job
d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far)
 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
Checkpoint expired before completing.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN
 a.remote.ReliableDeliverySupervisor - Association with remote system
[akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367]
has failed, address is now gated for [50] ms. Reason: [Disassociated]
 akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN
 a.remote.ReliableDeliverySupervisor - Association with remote system
[akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639]
has failed, address is now gated for [50] ms. Reason: [Disassociated]
 akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
akka.remote.ReliableDeliverySupervisor07:18:15.331
[flink-akka.actor.default-dispatcher-31] WARN
 o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete
checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive
failed attempts so far)
 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
Checkpoint Coordinator is suspending.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566)
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161)

Is there any issue with this Flink version or the new RocksDB version? What
should be the action item for this Exception?
The maximum savepoint size is 80.2 GB and we periodically(every 20 minutes)
take the savepoint for the job.
Checkpoint Type: aligned checkpoint


Question about Flink metrics

2023-05-04 Thread neha goyal
Hello,
I have a question about the Prometheus metrics. I am able to fetch the
metrics from the following expression.
sum(flink_jobmanager_job_numRestarts{job_name="$job_name"}) by (job_name)
Now I am interested in only a few jobs and I want to give them a label. How
to achieve this? How to give an additional label to Flink Prometheus
metrics so that I can fetch the metrics only for those jobs having that
label? This tag I need to set on the job level. Few jobs will have that tag
and others won't.


Flink Sql erroring at runtime

2023-05-02 Thread neha goyal
Hello,

I am using Flink 1.16.1 and observing a different behavior from Flink
1.13.6.

SELECT if(some_string_field is null, 'default', 'some_string_field') from
my_stream

This SQL flink job in the streaming environment is erroring out during
runtime with the exception mentioned below. There are no null values sent
and it is failing for the nonnull values as well.

It is running fine in Flink 1.13.6. Also, if I use the Integer field, it
runs fine.
Was there any change around this in Flink 14/15/16?

io.IOException: Failed to deserialize consumer record due to
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io
.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 22 more
Caused by: java.lang.NullPointerException
at
StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown
Source)
at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source)
at StreamExecCalc$53548.processElement_split10047(Unknown Source)
at StreamExecCalc$53548.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 28 more


Re: Flink not releasing the reference to a deleted jar file

2023-04-19 Thread neha goyal
Hi Shammon,

The Flink job doesn't exist after I close the execution environment right?
Can you please try the attached code and see that I am not sharing the file
with any other job? Until I close the running Java application the file
still has an open reference in the code mentioned.

On Thu, Apr 20, 2023 at 7:25 AM Shammon FY  wrote:

> Hi neha
>
> Flink can delete runtime data for a job when it goes to termination. But
> for external files such as udf jar files as you mentioned, I think you need
> to manage them yourself. The files may be shared between jobs, and can not
> be deleted when one flink job exists.
>
> Best,
> Shammon FY
>
>
> On Wed, Apr 19, 2023 at 1:37 PM neha goyal  wrote:
>
>> Adding to the above query, I have tried dropping the tables and the
>> function as well but no luck.
>>
>> On Wed, Apr 19, 2023 at 11:01 AM neha goyal  wrote:
>>
>>> Hello,
>>>
>>> I am attaching a sample code and screenshot where Flink is holding the
>>> reference to a jar file even after I close the streamExecutionEnvironment.
>>>
>>> Due to this, the deleted file is not getting cleaned up from the disk
>>> and we are getting disc space alerts. When we restart our application then
>>> these files get cleared from the disk.
>>> What is the way to gracefully shut down the Flink environment so that it
>>> releases all the resources' references?
>>>
>>> public class TestResourceRelease {
>>>
>>> public void check(){
>>> StreamExecutionEnvironment execEnv = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> try{
>>> StreamTableEnvironment env = 
>>> StreamTableEnvironment.create(execEnv);
>>> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
>>> 'com.my.udf.v2.EpochToDate' USING JAR 
>>> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
>>> TypeInformation[] typeInformationArray = getTypeInfoArray();
>>> String[] columnName = new String[]{"x", "y"};
>>> KafkaSource source = 
>>> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
>>> .setValueOnlyDeserializer(new 
>>> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
>>> typeInformationArray)))
>>> .setProperty("bootstrap.servers", "localhost:9092")
>>> .setTopics("test").build();
>>>
>>> DataStream stream = execEnv.fromSource(source, 
>>> WatermarkStrategy.noWatermarks(), "Kafka Source");
>>> env.registerDataStream("test", stream);
>>>
>>> Table table = env.fromDataStream(stream);
>>> env.registerTable("my_test", table);
>>> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
>>> `order_time`, `y` FROM `my_test`");
>>> System.out.println("created the table");
>>> }
>>> catch (Exception e){
>>> System.out.println(e);
>>>
>>> }
>>> finally {
>>> try {
>>> execEnv.close();
>>> } catch (Exception e) {
>>> e.printStackTrace();
>>> }
>>> File file = new 
>>> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
>>> file.delete();
>>> }
>>>
>>> }
>>>
>>> public static TypeInformation[] getTypeInfoArray(){
>>> TypeInformation[] typeInformations = new TypeInformation[2];
>>> typeInformations[0] = org.apache.flink.table.api.Types.LONG();
>>> typeInformations[1] = org.apache.flink.table.api.Types.LONG();
>>> return typeInformations;
>>> }
>>>
>>> }
>>>
>>>


Re: Flink not releasing the reference to a deleted jar file

2023-04-18 Thread neha goyal
Adding to the above query, I have tried dropping the tables and the
function as well but no luck.

On Wed, Apr 19, 2023 at 11:01 AM neha goyal  wrote:

> Hello,
>
> I am attaching a sample code and screenshot where Flink is holding the
> reference to a jar file even after I close the streamExecutionEnvironment.
>
> Due to this, the deleted file is not getting cleaned up from the disk and
> we are getting disc space alerts. When we restart our application then
> these files get cleared from the disk.
> What is the way to gracefully shut down the Flink environment so that it
> releases all the resources' references?
>
> public class TestResourceRelease {
>
> public void check(){
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> try{
> StreamTableEnvironment env = 
> StreamTableEnvironment.create(execEnv);
> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
> 'com.my.udf.v2.EpochToDate' USING JAR 
> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
> TypeInformation[] typeInformationArray = getTypeInfoArray();
> String[] columnName = new String[]{"x", "y"};
> KafkaSource source = 
> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
> .setValueOnlyDeserializer(new 
> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
> typeInformationArray)))
> .setProperty("bootstrap.servers", "localhost:9092")
> .setTopics("test").build();
>
> DataStream stream = execEnv.fromSource(source, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> env.registerDataStream("test", stream);
>
> Table table = env.fromDataStream(stream);
> env.registerTable("my_test", table);
> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
> `order_time`, `y` FROM `my_test`");
> System.out.println("created the table");
> }
> catch (Exception e){
> System.out.println(e);
>
> }
> finally {
> try {
> execEnv.close();
> } catch (Exception e) {
> e.printStackTrace();
> }
> File file = new 
> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
> file.delete();
> }
>
> }
>
> public static TypeInformation[] getTypeInfoArray(){
> TypeInformation[] typeInformations = new TypeInformation[2];
> typeInformations[0] = org.apache.flink.table.api.Types.LONG();
> typeInformations[1] = org.apache.flink.table.api.Types.LONG();
> return typeInformations;
> }
>
> }
>
>


Flink not releasing the reference to a deleted jar file

2023-04-18 Thread neha goyal
Hello,

I am attaching a sample code and screenshot where Flink is holding the
reference to a jar file even after I close the streamExecutionEnvironment.

Due to this, the deleted file is not getting cleaned up from the disk and
we are getting disc space alerts. When we restart our application then
these files get cleared from the disk.
What is the way to gracefully shut down the Flink environment so that it
releases all the resources' references?

public class TestResourceRelease {

public void check(){
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
try{
StreamTableEnvironment env = StreamTableEnvironment.create(execEnv);
env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate
AS 'com.my.udf.v2.EpochToDate' USING JAR
'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
TypeInformation[] typeInformationArray = getTypeInfoArray();
String[] columnName = new String[]{"x", "y"};
KafkaSource source =
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new
JsonRowDeserializationSchema(Types.ROW_NAMED(columnName,
typeInformationArray)))
.setProperty("bootstrap.servers", "localhost:9092")
.setTopics("test").build();

DataStream stream = execEnv.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");
env.registerDataStream("test", stream);

Table table = env.fromDataStream(stream);
env.registerTable("my_test", table);
Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS
`order_time`, `y` FROM `my_test`");
System.out.println("created the table");
}
catch (Exception e){
System.out.println(e);

}
finally {
try {
execEnv.close();
} catch (Exception e) {
e.printStackTrace();
}
File file = new
File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
file.delete();
}

}

public static TypeInformation[] getTypeInfoArray(){
TypeInformation[] typeInformations = new TypeInformation[2];
typeInformations[0] = org.apache.flink.table.api.Types.LONG();
typeInformations[1] = org.apache.flink.table.api.Types.LONG();
return typeInformations;
}

}


OOM error behaviour change in newer version

2023-04-06 Thread neha goyal
Hello,
I want to understand the reason behind the different behavior of
Flink-delta pipelines. When I am running a Kafka in and delta out pipeline
with fewer resources than needed for a particular pipeline, It fails with
an OOM error. In the newer version, it takes around 8 minutes to give the
first exception while in the older version, it fails in 2 minutes only.
what is the new change which can explain this behavior? we have enabled the
retrials, hence using the 'first' term.

Pipeline1
Flink version: 1.16.1
Delta connector version: 0.6.0
Time taken for the first OOM error: 8 mins


Pipeline2
Flink version: 1.13.6
Delta connector version: 0.5.0
Time taken for the first OOM error: 2 mins


Is there any API method for dynamic loading of the UDF jar

2023-02-26 Thread neha goyal
Hello,

In Flink 16, CREATE FUNCTION USING JAR functionality has been introduced
where we can specify the jar resources and the jar can be located in a
remote file system such as hdfs/s3. I don't see an alternative method for
the same functionality using the TableEnvironment methods call, for
example, createTemporarySystemFunction doesn't take any URI.

Will these methods be provided in the future?
Is there any difference in performance if we use TableEnvironment method
calls vs TableEnvironment.executeSql for the same feature? which one is
recommended?

Thanks and regards