Re: Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-16 Thread Arujit Pradhan
Hey Martijn,

Thanks a lot for getting back to us. To give you a little bit more context,
we do maintain an open-source project around flink dagger
<http://github.com/odpf/dagger> which is a wrapper for proto processing. As
part of the upgrade to the latest version, we did some refactoring and
moved to KafkaSource since the older FlinkKafkaConsumer was getting
deprecated.

So we currently do not have any set up to test the hypothesis. Also just
increasing the resources by a bit fixes it and it does happen with a small
set of jobs during high traffic.

We would love to get some input from the community as it might cause errors
in some of the jobs in production.

Thanks and regards,
//arujit

On Tue, Feb 15, 2022 at 8:48 PM Martijn Visser 
wrote:

> Hi Arujit,
>
> I'm also looping in some contributors from the connector and runtime
> perspective in this thread. Did you also test the upgrade first by only
> upgrading to Flink 1.14 and keeping the FlinkKafkaConsumer? That would
> offer a better way to determine if a regression is caused by the upgrade of
> Flink or because of the change in connector.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
>
> On Tue, 15 Feb 2022 at 13:07, Arujit Pradhan 
> wrote:
>
>> Hey team,
>>
>> We are migrating our Flink codebase and a bunch of jobs from Flink-1.9 to
>> Flink-1.14. To ensure uniformity in performance we ran a bunch of jobs for
>> a week both in 1.9 and 1.14 simultaneously with the same resources and
>> configurations and monitored them.
>>
>> Though most of the jobs are running fine, we have significant performance
>> degradation in some of the high throughput jobs during peak hours. As a
>> result, we can see high lag and data drops while processing messages from
>> Kafka in some of the jobs in 1.14 while in 1.9 they are working just fine.
>> Now we are debugging and trying to understand the potential reason for it.
>>
>> One of the hypotheses that we can think of is the change in the sequence
>> of processing in the source-operator. To explain this, adding screenshots
>> for the problematic tasks below.
>> The first one is for 1.14 and the second is for 1.9. Upon inspection, it
>> can be seen the sequence of processing 1.14 is -
>>
>> data_streams_0 -> Timestamps/Watermarks -> Filter -> Select.
>>
>> While in 1.9 it was,
>>
>> data_streams_0 -> Filter -> Timestamps/Watermarks -> Select.
>>
>> In 1.14 we are using KafkaSource API while in the older version it was
>> FlinkKafkaConsumer API. Wanted to understand if it can cause potential
>> performance decline as all other configurations/resources for both of the
>> jobs are identical and if so then how to avoid it. Also, we can not see any
>> unusual behaviour for the CPU/Memory while monitoring the affected jobs.
>>
>> Source Operator in 1.14 :
>> [image: image.png]
>> Source Operator in 1.9 :
>> [image: image.png]
>> Thanks in advance,
>> //arujit
>>
>>
>>
>>
>>
>>
>>


Performance Issues in Source Operator while migrating to Flink-1.14 from 1.9

2022-02-15 Thread Arujit Pradhan
Hey team,

We are migrating our Flink codebase and a bunch of jobs from Flink-1.9 to
Flink-1.14. To ensure uniformity in performance we ran a bunch of jobs for
a week both in 1.9 and 1.14 simultaneously with the same resources and
configurations and monitored them.

Though most of the jobs are running fine, we have significant performance
degradation in some of the high throughput jobs during peak hours. As a
result, we can see high lag and data drops while processing messages from
Kafka in some of the jobs in 1.14 while in 1.9 they are working just fine.
Now we are debugging and trying to understand the potential reason for it.

One of the hypotheses that we can think of is the change in the sequence of
processing in the source-operator. To explain this, adding screenshots for
the problematic tasks below.
The first one is for 1.14 and the second is for 1.9. Upon inspection, it
can be seen the sequence of processing 1.14 is -

data_streams_0 -> Timestamps/Watermarks -> Filter -> Select.

While in 1.9 it was,

data_streams_0 -> Filter -> Timestamps/Watermarks -> Select.

In 1.14 we are using KafkaSource API while in the older version it was
FlinkKafkaConsumer API. Wanted to understand if it can cause potential
performance decline as all other configurations/resources for both of the
jobs are identical and if so then how to avoid it. Also, we can not see any
unusual behaviour for the CPU/Memory while monitoring the affected jobs.

Source Operator in 1.14 :
[image: image.png]
Source Operator in 1.9 :
[image: image.png]
Thanks in advance,
//arujit


Provide DataTypeHint for ScalarUDF where the return type is Object[]

2022-01-03 Thread Arujit Pradhan
Hey team,

We are migrating our Flink codes from Flink-1.9 to Flink-1.14 and as a part
of this, we are updating a bunch of UDFs. Wanted to understand, how to
provide *data type hints for the UDFs which return Object[]*.

For example, if the return type is simply Object something like this works.

*@DataTypeHint(inputGroup = InputGroup.ANY)*

But could not find any examples on how to add type hints in case of an eval
function returning Object[]. If we explicitly, return something like
List instead of providing type hints works, but this might cause
issues downstream since lots of running jobs currently use in-built UDFs
like `*Cardinality*` on the result of the UDFs which fails if the return
type is List.

Thanks in advance.
//arujit


Re: Alternate to PreserveWatermark() in recent Flink versions

2021-10-25 Thread Arujit Pradhan
Hey JING,

Thanks a lot for replying to the thread!

Yeah, we are looking at `PreserveWatermarks`. But the issue is that the
datastream.assignTimestampsAndWatermarks() takes `WatermarkStrategy`(from
org.apache.flink.api.common.eventtime) and there is no default method to
define preserveWaterMark Stratergy there, though there are some other
methods for defining other strategies there like `forBoundedOutOfOrderness`.

I want to know if the preservation of watermarks happens there by default.
Since in the newer APIs source-level watermark definitions are deprecated.



On Mon, Oct 25, 2021 at 2:17 PM JING ZHANG  wrote:

> Hi,
> I'm not sure I understand your requirement.
> However, are you looking for `PreserveWatermarks` in package
> `org.apache.flink.table.sources.wmstrategies`?
>
> Best,
> JING ZHANG
>
>
> Arujit Pradhan  于2021年10月25日周一 下午4:02写道:
>
>> Hi all,
>>
>>
>> We maintain an Open-sourced project for protobuf data processing using
>> Flink dagger <http://github.com/odpf/dagger>. But we are currently on
>> Flink-1.9 and want to migrate to the latest stable 1.14.
>>
>>
>> In the older version, we use `*StreamTableSource` *and `
>> *DefinedRowtimeAttributes` *APIs for Table-source definition, similar to
>> this
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/streaming/time_attributes.html#using-a-tablesource-1>.
>> But since these APIs are deprecated we are now defining via
>> *APIExpressions*.
>>
>>
>> The issue for us is while defining WatermarkStrategy, more specifically
>> for the `*PreserveWatermarks*` strategy. We can not find an alternative
>> to this, though other WatermarkStrategies like `
>> *BoundedOutOfOrderTimestamps`*could be found in the newer API definition
>> in `*org.apache.flink.api.common.eventtime.WatermarkStrategy*` package.
>>
>>
>> Currently, we have logic something like in *DefinedRowtimeAttributes* :
>>
>>
>>
>> @Override
>>
>> public *List *getRowtimeAttributeDescriptors*()
>> {*
>>
>> *WatermarkStrategy *ws =
>>
>> enablePerPartitionWatermark ? new PreserveWatermarks*() *:
>> new BoundedOutOfOrderTimestamps*(*watermarkDelay*)*;
>>
>> return *Collections*.*singletonList**(*
>>
>> new RowtimeAttributeDescriptor*(*rowTimeAttributeName, new
>> ExistingField*(*rowTimeAttributeName*)*, ws*))*;
>>
>> *}*
>>
>>
>>
>> We want to use *PreserveWatermarks *in places since while Backfilling
>> historical data using flink we want to use underlying Watermark defined in
>> Kafka Consumer-level instead of Sources as it will prevent us from data
>> drops. Is there any alternate in the new APIs we can use? Or else what can
>> we use to get the desired behaviour.
>>
>>
>>
>> Thanks a lot, in advance!
>>
>


Alternate to PreserveWatermark() in recent Flink versions

2021-10-25 Thread Arujit Pradhan
Hi all,


We maintain an Open-sourced project for protobuf data processing using
Flink dagger . But we are currently on
Flink-1.9 and want to migrate to the latest stable 1.14.


In the older version, we use `*StreamTableSource` *and `
*DefinedRowtimeAttributes` *APIs for Table-source definition, similar to
this
.
But since these APIs are deprecated we are now defining via *APIExpressions*
.


The issue for us is while defining WatermarkStrategy, more specifically for
the `*PreserveWatermarks*` strategy. We can not find an alternative to
this, though other WatermarkStrategies like
`*BoundedOutOfOrderTimestamps`*could
be found in the newer API definition in `
*org.apache.flink.api.common.eventtime.WatermarkStrategy*` package.


Currently, we have logic something like in *DefinedRowtimeAttributes* :



@Override

public *List *getRowtimeAttributeDescriptors*()
{*

*WatermarkStrategy *ws =

enablePerPartitionWatermark ? new PreserveWatermarks*() *: new
BoundedOutOfOrderTimestamps*(*watermarkDelay*)*;

return *Collections*.*singletonList**(*

new RowtimeAttributeDescriptor*(*rowTimeAttributeName, new
ExistingField*(*rowTimeAttributeName*)*, ws*))*;

*}*



We want to use *PreserveWatermarks *in places since while Backfilling
historical data using flink we want to use underlying Watermark defined in
Kafka Consumer-level instead of Sources as it will prevent us from data
drops. Is there any alternate in the new APIs we can use? Or else what can
we use to get the desired behaviour.



Thanks a lot, in advance!


Tests in FileUtilsTest while building Flink in local

2020-02-19 Thread Arujit Pradhan
Hi all,

I was trying to build Flink in my local machine and these two unit tests
are failing.



*[ERROR] Errors:[ERROR]
FileUtilsTest.testCompressionOnRelativePath:261->verifyDirectoryCompression:440
» NoSuchFile[ERROR]   FileUtilsTest.testDeleteDirectoryConcurrently »
FileSystem /var/folders/x9/tr2...*

I am building on these versions
Java 1.8.0_221
maven 3.6.3
and OS is Mac Catalina(10.15).

Did anyone face this issue? Am I missing something?

*The stack-trace is :*
java.nio.file.NoSuchFileException:
../../../../../../../../var/folders/x9/tr2xclq51sx891lbntv7bwy4gn/T/junit3367096668518353289/compressDir/rootDir

at
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at
java.base/sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:389)
at java.base/java.nio.file.Files.createDirectory(Files.java:689)
at
org.apache.flink.util.FileUtilsTest.verifyDirectoryCompression(FileUtilsTest.java:440)
at
org.apache.flink.util.FileUtilsTest.testCompressionOnRelativePath(FileUtilsTest.java:261)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

Thanks in advance.


Open Method is not being called in case of AggregateFunction UDFs

2019-12-11 Thread Arujit Pradhan
Hi all,

So we are creating some User Defined Functions of type AggregateFunction.
And we want to send some static metrics from the *open()* method of the
UDFs as we can get *MetricGroup *by *FunctionContext *which is only exposed
in the open method. Our code looks something like this(Which is an
implementation of count distinct in SQL) :

public class DistinctCount extends AggregateFunction {
@Override
public DistinctCountAccumulator createAccumulator() {
return new DistinctCountAccumulator();
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
MetricGroup metricGroup = context.getMetricGroup();
// add some metric to the group here
System.out.println("in the open of UDF");
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public Integer getValue(DistinctCountAccumulator distinctCountAccumulator) {
System.out.println("in the udf");
return distinctCountAccumulator.count();
}

public void accumulate(DistinctCountAccumulator
distinctCountAccumulator, String item) {
if (item == null) {
return;
}
distinctCountAccumulator.add(item);
}
}


But when we use this UDF in FlinkSQL, it seems like the open method is not
being called at all.

>From the filnk UDF documentation we find :

*The open() method is called once before the evaluation method. The close()
method after the last call to the evaluation method.*

*The open() method provides a FunctionContext that contains information
about the context in which user-defined functions are executed, such as the
metric group, the distributed cache files, or the global job parameters.*
Then is there any reason that open is not working in AggragateFunctions.
Btw it works fine in case of ScalarFunctions. Is there any alternative
scope where we can register some static metrics in a UDF.


Thanks and regards,
*Arujit*


Re: Compound Time interval in SQL queries

2019-11-21 Thread Arujit Pradhan
Hi, godfrey,

Thanks for your reply. But now I am getting this error :












*Exception in thread "main"
org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.table.api.TableException: Only constant window descriptors
are supported.at
com.gojek.daggers.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:30)Caused
by: org.apache.flink.table.api.TableException: Only constant window
descriptors are supported.at
org.apache.flink.table.api.TableException$.apply(exceptions.scala:57)
  at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:72)
  at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:88)
  at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
  at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
  at
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
  at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
  at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
  at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)*

Any reason why this may be happening.

Thanks and regards,
arujit

On Thu, Nov 21, 2019 at 2:37 PM 贺小令(晓令) 
wrote:

> please try  this approach: interval + interval
>
> like this:
> SELECT count(1) AS event_count ,
> TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS
> window_timestamp
> FROM `data_stream`
> GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE)
>
> thanks,
> godfrey
>
> --
> 发件人:Arujit Pradhan 
> 发送时间:2019年11月21日(星期四) 16:23
> 收件人:user 
> 主 题:Compound Time interval in SQL queries
>
> Hi all,
>
> Is there a way to define a compound time interval(that can consist of both
> HOUR and MINUTE) in windows in a Flink SQL query.
>
> For example, we want to do something like this:
> SELECT count(1) AS event_count ,
> TUMBLE_END(rowtime,
> INTERVAL '7' HOUR
> AND '30' MINUTE) AS window_timestamp
> FROM `data_stream`
> GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )
>
> We can not even convert this to Minutes as we are getting this error :
>  *Interval field value  exceeds precision of MINUTE(2) field*
>
> We were going through Calcite documentation and could not find any
> workaround on this.
>
> Thanks and regards,
> arujit
>
>


Compound Time interval in SQL queries

2019-11-21 Thread Arujit Pradhan
Hi all,

Is there a way to define a compound time interval(that can consist of both
HOUR and MINUTE) in windows in a Flink SQL query.

For example, we want to do something like this:
SELECT count(1) AS event_count ,
TUMBLE_END(rowtime,
INTERVAL '7' HOUR
AND '30' MINUTE) AS window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )

We can not even convert this to Minutes as we are getting this error :
 *Interval field value  exceeds precision of MINUTE(2) field*

We were going through Calcite documentation and could not find any
workaround on this.

Thanks and regards,
arujit