How to implement a WindowableTask(similar to samza) in Apache flink?

2020-12-22 Thread Debraj Manna
I am new to flink and this is my first post in the community.


Samza has a concept of windowing

where
a stream processing job needs to do something in regular intervals,
regardless of how many incoming messages the job is processing.

For example, a simple per-minute event counter in samza will be like below:


public class EventCounterTask implements StreamTask, WindowableTask {

  public static final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "events-per-minute");

  private int eventsSeen = 0;

  public void process(IncomingMessageEnvelope envelope,
  MessageCollector collector,
  TaskCoordinator coordinator) {
eventsSeen++;
  }

  public void window(MessageCollector collector,
 TaskCoordinator coordinator) {
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, eventsSeen));
eventsSeen = 0;
  }
}

Can someone let me know how to implement an equivalent thing in apache
flink (samza is single threaded so window and process will not happen
concurrently) or point me to the relevant documentation?


How to debug flink serialization error?

2021-02-11 Thread Debraj Manna
HI

I am having a ProcessFunction like below which is throwing an error like
below whenever I am trying to use it in a opeator . My understanding when
flink initializes the operator dag, it serializes things and sends over to
the taskmanagers.
So I have marked the  operator state transient, since the operator state
will be populated within the open() call that gets invoked in each
taskmanager. But I am still getting the serialization exception like below.
Can suggest some ways where I can debug this type of serialization error in
Flink 1.12?

org.apache.flink.api.common.InvalidProgramException: public
com.vnera.programs.metrics.MetricStoreProgramHelper
com.vnera.analytics.engine.MetricStoreMapper.getMetricStoreProgramHelper(com.vnera.resourcemanager.ResourceManager,com.vnera.storage.metrics.TsdbMetricStore$Writer,java.lang.String)
is not serializable. The object probably contains or references non
serializable fields.

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
...
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:681)
at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:661)
at
com.vnera.analytics.engine.MetricStoreOperator.linkFrom(MetricStoreOperator.java:27)
at
com.vnera.analytics.engine.AnaPipelineStage.link(AnaPipelineStage.java:12)
at
com.vnera.analytics.engine.AnalyticsEngine.createPipeline(AnalyticsEngine.java:106)
at
com.vnera.analytics.engine.source.DerivedMetricCreatorTest.testPipeline(DerivedMetricCreatorTest.java:98)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
...
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
...
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
...
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
...
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: java.io.NotSerializableException: java.lang.reflect.Method
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
... 45 more

My ProcessFunction looks like below
public class MetricStoreMapper extends
ProcessFunction {
private static final String MSG_STALENESS_METRIC_NAME =
VneraMetrics.createMetricName(MetricStoreMapper.class,
"metric_sdm_staleness");
private transient Histogram stalenessHisto =
VneraMetrics.histogram(MSG_STALENESS_METRIC_NAME, VneraMetrics.DD_REPORTER);
private transient MetricStoreProgramHelper metricStoreHelper;

@Override
public void open(Configuration parameters) throws Exception {
ExecutionConfig.GlobalJobParameters jobParams =
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Configuration conf =
ParameterTool.fromMap(jobParams.toMap()).getConfiguration();
String taskInstanceId =
getRuntimeContext().getTaskNameWithSubtasks();
MetricStoreFactory.StoreType metStoreType =
conf.getEnum(MetricStoreFactory.StoreType.class,
StoreOptions.METRIC_STORE_TYPE);
TaskManagerState state =
TaskManagerState.getTaskManagerState(ConfigStoreFactory.StoreType.MEMORY,
MetricStoreFactory.StoreType.MEMORY);
ResourceManager rm = state.getResourceManager();
metricStoreHelper = getMetricStoreProgramHelper(rm,
state.getTsdbMetricStore().writer(taskInstanceId),
taskInstanceId);
}

@Override
public void processElement(SelfDescribingMessageDO value, Context ctx,
Collector out) throws Exception {
MetricStoreProgramHelper.MetricStoreOutput output =
metricStoreHelper.execute(value);
for (SelfDescribingMessageDO sd

Re: How to debug flink serialization error?

2021-02-13 Thread Debraj Manna
Thanks Robert for the pointers.

It is some issue with mockito which I was using to mock
getMetricStoreProgramHelper method in my unit test. For now I have modified
my unit test to not use mockito.

I will try to provide a reproducible example.

On Fri, Feb 12, 2021 at 8:56 PM Robert Metzger  wrote:

> Thanks for reaching out to the Flink ML.
>
> It reports getMetricStoreProgramHelper as a non-serializable field, even
> though it looks a lot like a method. The only recommendation I have for you
> is carefully reading the full error message + stack trace.
>
> Your approach of using tagging fields as "transient" is absolutely correct.
> There's also this message: NotSerializableException:
> java.lang.reflect.Method, but I can not find a field of type Method.
>
> Can you provide a minimal reproducible example of this issue?
>
> On Fri, Feb 12, 2021 at 7:06 AM Debraj Manna 
> wrote:
>
>> HI
>>
>> I am having a ProcessFunction like below which is throwing an error like
>> below whenever I am trying to use it in a opeator . My understanding when
>> flink initializes the operator dag, it serializes things and sends over to
>> the taskmanagers.
>> So I have marked the  operator state transient, since the operator state
>> will be populated within the open() call that gets invoked in each
>> taskmanager. But I am still getting the serialization exception like below.
>> Can suggest some ways where I can debug this type of serialization error in
>> Flink 1.12?
>>
>> org.apache.flink.api.common.InvalidProgramException: public
>> com.vnera.programs.metrics.MetricStoreProgramHelper
>> com.vnera.analytics.engine.MetricStoreMapper.getMetricStoreProgramHelper(com.vnera.resourcemanager.ResourceManager,com.vnera.storage.metrics.TsdbMetricStore$Writer,java.lang.String)
>> is not serializable. The object probably contains or references non
>> serializable fields.
>>
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>> ...
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:681)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:661)
>> at
>> com.vnera.analytics.engine.MetricStoreOperator.linkFrom(MetricStoreOperator.java:27)
>> at
>> com.vnera.analytics.engine.AnaPipelineStage.link(AnaPipelineStage.java:12)
>> at
>> com.vnera.analytics.engine.AnalyticsEngine.createPipeline(AnalyticsEngine.java:106)
>> at
>> com.vnera.analytics.engine.source.DerivedMetricCreatorTest.testPipeline(DerivedMetricCreatorTest.java:98)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>> ...
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> ...
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> ...
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at
>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>> ...
>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
>> Caused by: java.io.NotSerializableException: java.lang.reflect.Method
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>> at org.apache.flink.api.ja

Flink 1.12.1 example applications failing on a single node yarn cluster

2021-02-24 Thread Debraj Manna
I am trying out flink example as explained in flink docs
<https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#application-mode>
in
a single node yarn cluster
<https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation>
.

On executing

ubuntu@vrni-platform:~/build-target/flink$ ./bin/flink run-application -t
yarn-application ./examples/streaming/TopSpeedWindowing.jar

It is failing with the below errors

org.apache.flink.client.deployment.ClusterDeploymentException:
Couldn't deploy Yarn Application Cluster
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
at 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
at 
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.
Diagnostics from YARN: Application application_1614159836384_0045
failed 1 times (global limit =2; local limit is =1) due to AM
Container for appattempt_1614159836384_0045_01 exited with
exitCode: -1000
Failing this attempt.Diagnostics: [2021-02-24 16:19:39.409]File
file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
does not exist
java.io.FileNotFoundException: File
file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:867)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:242)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:235)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:223)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

I have made the log level DEBUG and I do see that
flink-dist_2.12-1.12.1.jar is getting copied to
/home/ubuntu/.flink/application_1614159836384_0045.

2021-02-24 16:19:37,768 DEBUG
org.apache.flink.yarn.YarnApplicationFileUploader[] - Got
modification time 1614183577000 from remote path
file:/home/ubuntu/.flink/application_1614159836384_0045/TopSpeedWindowing.jar
2021-02-24 16:19:37,769 DEBUG
org.apache.flink.yarn.YarnApplicationFileUploader[] -
Copying from file:/home/ubuntu/build-target/flink/lib/flink-dist_2.12-1.12.1.jar
to 
file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
with replication factor 1

The entire DEBUG logs are placed here
<https://gist.github.com/debraj-manna/a38addc37a322cb242fc66fab1f9fee7>.
Nodemanager logs are placed here
<https://gist.github.com/debraj-manna/3732616fe78db439c0d6453454e8e02b>.

Can someone let me know what is going wrong? Does flink

Re: Flink 1.12.1 example applications failing on a single node yarn cluster

2021-02-24 Thread Debraj Manna
The same has been asked in StackOverflow
<https://stackoverflow.com/questions/66355206/flink-1-12-1-example-application-failing-on-a-single-node-yarn-cluster>
also. Any suggestions here?

On Wed, Feb 24, 2021 at 10:25 PM Debraj Manna 
wrote:

> I am trying out flink example as explained in flink docs
> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#application-mode>
>  in
> a single node yarn cluster
> <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation>
> .
>
> On executing
>
> ubuntu@vrni-platform:~/build-target/flink$ ./bin/flink run-application -t
> yarn-application ./examples/streaming/TopSpeedWindowing.jar
>
> It is failing with the below errors
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn Application Cluster
> at 
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
> at 
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
> at 
> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
> Caused by: 
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN 
> application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1614159836384_0045 failed 1 
> times (global limit =2; local limit is =1) due to AM Container for 
> appattempt_1614159836384_0045_01 exited with  exitCode: -1000
> Failing this attempt.Diagnostics: [2021-02-24 16:19:39.409]File 
> file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
>  does not exist
> java.io.FileNotFoundException: File 
> file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
>  does not exist
> at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
> at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:867)
> at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
> at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
> at 
> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:242)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:235)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:223)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> I have made the log level DEBUG and I do see that flink-dist_2.12-1.12.1.jar 
> is getting copied to /home/ubuntu/.flink/application_1614159836384_0045.
>
> 2021-02-24 16:19:37,768 DEBUG 
> org.apache.flink.yarn.YarnApplicationFileUploader[] - Got 
> modification time 1614183577

Re: Flink 1.12.1 example applications failing on a single node yarn cluster

2021-02-26 Thread Debraj Manna
In my setup hadoop-yarn-nodemenager is running with yarn user.

ubuntu@vrni-platform:/tmp/flink$ ps -ef | grep nodemanager
yarn  4953 1  2 05:53 ?00:11:26
/usr/lib/jvm/java-8-openjdk/bin/java -Dproc_nodemanager
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/lib/heap-dumps/yarn
-XX:+ExitOnOutOfMemoryError -Dyarn.log.dir=/var/log/hadoop-yarn
-Dyarn.log.file=hadoop-yarn-nodemanager-vrni-platform.log
-Dyarn.home.dir=/usr/lib/hadoop-yarn -Dyarn.root.logger=INFO,console
-Djava.library.path=/usr/lib/hadoop/lib/native -Xmx512m
-Dhadoop.log.dir=/var/log/hadoop-yarn
-Dhadoop.log.file=hadoop-yarn-nodemanager-vrni-platform.log
-Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str=yarn
-Dhadoop.root.logger=INFO,RFA -Dhadoop.policy.file=hadoop-policy.xml
-Dhadoop.security.logger=INFO,NullAppender
org.apache.hadoop.yarn.server.nodemanager.NodeManager

I was executing the ./bin/flink command as ubuntu user and yarn user does
not have permission to write to ubuntu's home folder in my setup.

ubuntu@vrni-platform:/tmp/flink$ echo ~ubuntu
/home/ubuntu
ubuntu@vrni-platform:/tmp/flink$ echo ~yarn
/var/lib/hadoop-yarn


It appears to me flink needs permission to write to user's home directory
to create a .flink folder even when the job is submitted in yarn. It is
working fine for me if I run the flink with yarn user. in my setup.

Just for my knowledge is there any config in flink to specify the location of
.flink folder?

On Thu, Feb 25, 2021 at 10:48 AM Debraj Manna 
wrote:

> The same has been asked in StackOverflow
> <https://stackoverflow.com/questions/66355206/flink-1-12-1-example-application-failing-on-a-single-node-yarn-cluster>
> also. Any suggestions here?
>
> On Wed, Feb 24, 2021 at 10:25 PM Debraj Manna 
> wrote:
>
>> I am trying out flink example as explained in flink docs
>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#application-mode>
>>  in
>> a single node yarn cluster
>> <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation>
>> .
>>
>> On executing
>>
>> ubuntu@vrni-platform:~/build-target/flink$ ./bin/flink run-application
>> -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
>>
>> It is failing with the below errors
>>
>> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
>> deploy Yarn Application Cluster
>> at 
>> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
>> at 
>> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
>> at 
>> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213)
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1136)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> at 
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136)
>> Caused by: 
>> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The 
>> YARN application unexpectedly switched to state FAILED during deployment.
>> Diagnostics from YARN: Application application_1614159836384_0045 failed 1 
>> times (global limit =2; local limit is =1) due to AM Container for 
>> appattempt_1614159836384_0045_01 exited with  exitCode: -1000
>> Failing this attempt.Diagnostics: [2021-02-24 16:19:39.409]File 
>> file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
>>  does not exist
>> java.io.FileNotFoundException: File 
>> file:/home/ubuntu/.flink/application_1614159836384_0045/flink-dist_2.12-1.12.1.jar
>>  does not exist
>> at 
>> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
>> at 
>> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:867)
>> at 
>> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
>> at 
>> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
>> at 
>> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
>> at org.apache.hadoop.yarn.util.

Re: Flink 1.12.1 example applications failing on a single node yarn cluster

2021-02-26 Thread Debraj Manna
Thanks Matthias for replying.

Yes there was some yarn configuration issue on my side which I mentioned in
my last email.

I am starting on flink. So just for my understanding in few links (posted
below) it is reported that flink needs to create a .flink directory in the
users home folder. Even though I am not using HDFS with yarn (in
single-node deployment) but I am also observing the same. Is there a way I
can configure the location where flink stores the jar and configuration
file as mentioned in the below link?

https://wints.github.io/flink-web//faq.html#the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup

>From the above link

*"Flink creates a .flink/ directory in the users home directory where it
stores the Flink jar and configuration file."*

Same mentioned here
<https://docs.cloudera.com/csa/1.2.0/installation/topics/csa-hdfs-home-install.html>
.

<https://wints.github.io/flink-web//faq.html#the-yarn-session-crashes-with-a-hdfs-permission-exception-during-startup>

On Fri, Feb 26, 2021 at 9:45 PM Matthias Pohl 
wrote:

> Hi Debraj,
> thanks for reaching out to the Flink community. Without knowing the
> details on how you've set up the Single-Node YARN cluster, I would still
> guess that it is a configuration issue on the YARN side. Flink does not
> know about a .flink folder. Hence, there is no configuration to set this
> folder.
>
> Best,
> Matthias
>
> On Fri, Feb 26, 2021 at 2:40 PM Debraj Manna 
> wrote:
>
>> In my setup hadoop-yarn-nodemenager is running with yarn user.
>>
>> ubuntu@vrni-platform:/tmp/flink$ ps -ef | grep nodemanager
>> yarn  4953 1  2 05:53 ?00:11:26
>> /usr/lib/jvm/java-8-openjdk/bin/java -Dproc_nodemanager
>> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/lib/heap-dumps/yarn
>> -XX:+ExitOnOutOfMemoryError -Dyarn.log.dir=/var/log/hadoop-yarn
>> -Dyarn.log.file=hadoop-yarn-nodemanager-vrni-platform.log
>> -Dyarn.home.dir=/usr/lib/hadoop-yarn -Dyarn.root.logger=INFO,console
>> -Djava.library.path=/usr/lib/hadoop/lib/native -Xmx512m
>> -Dhadoop.log.dir=/var/log/hadoop-yarn
>> -Dhadoop.log.file=hadoop-yarn-nodemanager-vrni-platform.log
>> -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str=yarn
>> -Dhadoop.root.logger=INFO,RFA -Dhadoop.policy.file=hadoop-policy.xml
>> -Dhadoop.security.logger=INFO,NullAppender
>> org.apache.hadoop.yarn.server.nodemanager.NodeManager
>>
>> I was executing the ./bin/flink command as ubuntu user and yarn user
>> does not have permission to write to ubuntu's home folder in my setup.
>>
>> ubuntu@vrni-platform:/tmp/flink$ echo ~ubuntu
>> /home/ubuntu
>> ubuntu@vrni-platform:/tmp/flink$ echo ~yarn
>> /var/lib/hadoop-yarn
>>
>>
>> It appears to me flink needs permission to write to user's home directory
>> to create a .flink folder even when the job is submitted in yarn. It is
>> working fine for me if I run the flink with yarn user. in my setup.
>>
>> Just for my knowledge is there any config in flink to specify the
>> location of .flink folder?
>>
>> On Thu, Feb 25, 2021 at 10:48 AM Debraj Manna 
>> wrote:
>>
>>> The same has been asked in StackOverflow
>>> <https://stackoverflow.com/questions/66355206/flink-1-12-1-example-application-failing-on-a-single-node-yarn-cluster>
>>> also. Any suggestions here?
>>>
>>> On Wed, Feb 24, 2021 at 10:25 PM Debraj Manna 
>>> wrote:
>>>
>>>> I am trying out flink example as explained in flink docs
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#application-mode>
>>>>  in
>>>> a single node yarn cluster
>>>> <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation>
>>>> .
>>>>
>>>> On executing
>>>>
>>>> ubuntu@vrni-platform:~/build-target/flink$ ./bin/flink run-application
>>>> -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
>>>>
>>>> It is failing with the below errors
>>>>
>>>> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
>>>> deploy Yarn Application Cluster
>>>> at 
>>>> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465)
>>>> at 
>>>> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
>>>> at 
>>>> org.apache.flink.client.cli.CliFrontend.runApplication(C

Flink 1.12 Compatibility with hbase-shaded-client 2.1 in application jar

2021-03-02 Thread Debraj Manna
Hi

I am trying to deploy an application in flink 1.12 having
hbase-shaded-client 2.1.0 as dependency  in application mode
.
On deploying the application I am seeing the below ClassCastException:

*org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
cannot be cast to
org.apache.hadoop.hbase.shaded.com.google.protobuf.Message*

*I have done *export HADOOP_CLASSPATH=`hadoop classpath` as mentioned in
the hadoop documentation. I did not add any hadoop / hbase jars in the
flink/lib folder .

ubuntu@vrni-platform://tmp/debraj-flink/flink/lib$ ls
flink-csv-1.12.1.jarflink-json-1.12.1.jar
 flink-table_2.12-1.12.1.jarlog4j-1.2.17.jar
slf4j-log4j12-1.7.25.jar
flink-dist_2.12-1.12.1.jar  flink-shaded-zookeeper-3.4.14.jar
 flink-table-blink_2.12-1.12.1.jar  log4j-to-slf4j-2.11.1.jar
 vrni-flink-datadog-0.001-SNAPSHOT.jar

Can anyone suggest what could be going wrong here?

The full exception trace is like below

2021-03-02 18:10:45,819 ERROR
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager
- Fatal error occurred in ResourceManager.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not start the ResourceManager
akka.tcp://flink@localhost:41477/user/rpc/resourcemanager_0
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:233)
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:607)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:181)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
at akka.actor.ActorCell.invoke(ActorCell.scala:583)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Cannot initialize resource provider.
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:124)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:245)
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:229)
... 22 more
Caused by: 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
Could not start resource manager client.
at 
org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:181)
at 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:81)
at 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:122)
... 24 more
Caused by: java.lang.ClassCastException:
org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
cannot be cast to
org.apache.hadoop.hbase.shaded.com.google.protobuf.Message
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:225)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy16.registerApplicationMaster(Unknown Source)
at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:107)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

Re: Flink 1.12 Compatibility with hbase-shaded-client 2.1 in application jar

2021-03-03 Thread Debraj Manna
The issue is resolved. org.apache.hbase exclusion was missing on my
application pom while creating the uber jar.

diff --git a/map/engine/pom.xml b/map/engine/pom.xml
index 8337be031d1..8eceb721fa7 100644
--- a/map/engine/pom.xml
+++ b/map/engine/pom.xml
@@ -203,6 +203,7 @@
 org.slf4j:*
 log4j:*
 org.apache.hadoop:*
+org.apache.hbase:*
 
 
 

On Wed, Mar 3, 2021 at 10:12 AM Debraj Manna 
wrote:

> Hi
>
> I am trying to deploy an application in flink 1.12 having
> hbase-shaded-client 2.1.0 as dependency  in application mode
> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#application-mode>.
> On deploying the application I am seeing the below ClassCastException:
>
> *org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
> cannot be cast to
> org.apache.hadoop.hbase.shaded.com.google.protobuf.Message*
>
> *I have done *export HADOOP_CLASSPATH=`hadoop classpath` as mentioned in
> the hadoop documentation. I did not add any hadoop / hbase jars in the
> flink/lib folder .
>
> ubuntu@vrni-platform://tmp/debraj-flink/flink/lib$ ls
> flink-csv-1.12.1.jarflink-json-1.12.1.jar
>  flink-table_2.12-1.12.1.jarlog4j-1.2.17.jar
> slf4j-log4j12-1.7.25.jar
> flink-dist_2.12-1.12.1.jar  flink-shaded-zookeeper-3.4.14.jar
>  flink-table-blink_2.12-1.12.1.jar  log4j-to-slf4j-2.11.1.jar
>  vrni-flink-datadog-0.001-SNAPSHOT.jar
>
> Can anyone suggest what could be going wrong here?
>
> The full exception trace is like below
>
> 2021-03-02 18:10:45,819 ERROR 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager  - 
> Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Could not start the ResourceManager 
> akka.tcp://flink@localhost:41477/user/rpc/resourcemanager_0
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:233)
> at 
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:607)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:181)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at akka.actor.Actor.aroundReceive(Actor.scala:539)
> at akka.actor.Actor.aroundReceive$(Actor.scala:537)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
> at akka.actor.ActorCell.invoke(ActorCell.scala:583)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
> at akka.dispatch.Mailbox.run(Mailbox.scala:229)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Cannot initialize resource provider.
> at 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:124)
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:245)
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:229)
> ... 22 more
> Caused by: 
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Could not start resource manager client.
> at 
> org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:181)
> at 
> org.apache.flink.runtime.resourc

Hadoop Integration Link broken in downloads page

2021-03-08 Thread Debraj Manna
Hi

It appears the Hadoop Interation

link is broken on downloads  page.

Apache FlinkĀ® 1.12.2 is our latest stable release.
> If you plan to use Apache Flink together with Apache Hadoop (run Flink on
> YARN, connect to HDFS, connect to HBase, or use some Hadoop-based file
> system connector), please check out the Hadoop Integration
> 
>  documentation.


It is throwing 404 Error.

Thanks


flink all events getting dropped as late

2021-05-18 Thread Debraj Manna
Crossposting from stackoverflow


My flink pipeline looks like below

WatermarkStrategy watermarkStrategy = WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(900))
.withTimestampAssigner((metric, timestamp) -> {
logger.info("ETS: mts: {}, ts: {}",
metric.metricPoint.timeInstant, timestamp);
return metric.metricPoint.timeInstant;
});

metricStream = kafkasource
.process(<>)
.assignTimestampsAndWatermarks(watermarkStrategy)
.transform("debugFilter",
TypeInformation.of(<>), new StreamWatermarkDebugFilter<>("Op"))
.filter(<>)
.map(<>)
.flatMap(<>)
.keyBy(<>)
.window(TumblingEventTimeWindows.of(Time.seconds(300)))
.allowedLateneess(900)
.sideOutputLateData(lateOutputTag)
.aggregate(AggregateFunction, ProcessWindowFunction)
.addSink()

I am running with parallelism 1 and default setAutowatermarkInterval of 200
ms. I did not set setStreamTimeCharacteristic as from flink 1.12 by default
it is event time.

I am seeing that watermark is progressing from the output of
StreamWatermarkDebugFilter

but
all the events are getting marked as late and is getting gathered at
lateOutputTag.

2021-05-18 17:14:19,745 INFO  - ETS: mts:
162131010, ts: 1621310582271
2021-05-18 17:14:19,745 INFO  - ETS: mts:
162131010, ts: 1621310582271
2021-05-18 17:14:19,842 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162130949
2021-05-18 17:14:19,944 INFO  - ETS: mts:
162130980, ts: 1621310582275
2021-05-18 17:14:19,944 INFO  - ETS: mts:
162130980, ts: 1621310582275
...
2021-05-18 17:14:20,107 INFO  - ETS: mts:
162131038, ts: 1621310582278
2021-05-18 17:14:20,107 INFO  - ETS: mts:
162131038, ts: 1621310582278
2021-05-18 17:14:20,137 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162130977
2021-05-18 17:14:20,203 INFO  - ETS: mts:
162130980, ts: 1621310582279
...
2021-05-18 17:17:47,839 INFO  - ETS: mts:
162131010, ts: 1621310681159
2021-05-18 17:17:47,848 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162131009
2021-05-18 17:17:47,958 INFO  - ETS: mts:
162130980, ts: 1621310681237
2021-05-18 17:17:47,958 INFO  - ETS: mts:
162130980, ts: 1621310681237
...
2021-05-18 17:22:24,207 INFO  - ETS: mts:
162131010, ts: 1621310703622
2021-05-18 17:22:24,229 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162131039
2021-05-18 17:22:24,315 INFO  - ETS: mts:
162130980, ts: 1621310705177
2021-05-18 17:22:24,315 INFO  - ETS: mts:
162130980, ts: 1621310705177

I have seen this discussion

and
it is not an idleness problem.

It looks like related to this discussion
.
Can someone suggest how can I debug this problem further?


Flink Protobuf serialization messages does not contain a setter for field bitField0_

2021-06-21 Thread Debraj Manna
Hi

As mentioned in the documentation

I have registered the Protobuf Serializer like below

env.getConfig().registerTypeWithKryoSerializer(SelfDescribingMessageDO.class,
ProtobufSerializer.class);

It is working but I am seeing INFO logs like below

2021-06-16 06:17:22,235 INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
com.vnera.model.protobufs.SelfDescribingMessageDO does not contain a
getter for field bitField0_

2021-06-16 06:17:22,235 INFO
org.apache.flink.api.java.typeutils.TypeExtractor - class
com.vnera.model.protobufs.SelfDescribingMessageDO does not contain a
setter for field bitField0_


Can someone let me know why am I seeing this and how can I get around this?

Flink Version 1.13.0


"Legacy Source Thread" line in logs

2021-06-23 Thread Debraj Manna
Hi

I am seeing the below logs in flink 1.13.0 running in YARN

2021-06-23T13:41:45.761Z WARN grid.task.MetricSdmStalenessUtils Legacy
Source Thread - Source: MetricSource -> Filter -> MetricStoreMapper ->
(Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink:
FlinkKafkaProducer11, Sink: TSDBSink14) (1/1)#0
updateMetricStalenessInHisto:32 Received a non-positive staleness = -194239
at 1624455705761

Can someone let me know what does the "Legacy Source Thread" denotes?

I saw the same question here

in the deprecated mailing list with no answer. So starting a new email
thread here.


Re: "Legacy Source Thread" line in logs

2021-06-24 Thread Debraj Manna
Thanks Fabian for replying. But I am using KafkaSource only.

The code is something like below.

class MetricSource {
  final Set metricSdms = new HashSet();
  ...
   env.addSource(MetricKafkaSourceFactory.createConsumer(jobParams))

.name(MetricSource.class.getSimpleName())
.uid(MetricSource.class.getSimpleName())
.filter(sdm -> metricSdms.contains(sdm.getType()));

}

class MetricKafkaSourceFactory {
  public static FlinkKafkaConsumer
createConsumer(final Configuration jobParams) {
   ...
   return new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),
props);
   }
}


On Wed, Jun 23, 2021 at 7:31 PM Fabian Paul 
wrote:

> Hi Debraj,
>
> By Source Legacy Thread we refer to all sources which do not implement the
> new interface yet [1]. Currently only the Hive, Kafka and FileSource
> are already migrated. In general, there is no sever downside of using the
> older source but in the future we plan only to implement ones based on
> the new operator model.
>
> Best,
> Fabian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
>


Re: "Legacy Source Thread" line in logs

2021-06-24 Thread Debraj Manna
Thanks Fabian again for the clarification.

On Thu, Jun 24, 2021 at 8:16 PM Fabian Paul 
wrote:

> Hi Debraj,
>
> Sorry for the confusion the FlinkKafkaConsumer is the old source and the
> overhauled one you can find here [1].
> You would need to replace the FlinkKafkaConsumer with the KafkaSource to
> not see the message anymore.
>
> Best
> Fabian
>
>
> [1]
> https://github.com/apache/flink/blob/2bd8fab01d2aba99a5f690d051e817d10d2c6f24/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75
>
>


owasp-dependency-check is flagging flink 1.13 for scala 2.12.7

2021-07-02 Thread Debraj Manna
Hi,

I was running owasp-dependency-check
 in a java application
based on flink-1.13.0 (scala 2.12). scala 2.12.7 was getting flagged for
this

.

Relevant Dependency for this -

FO] +- org.apache.flink:flink-streaming-java_2.12:jar:1.13.0:provided
[INFO] |  +- org.apache.flink:flink-file-sink-common:jar:1.13.0:provided
[INFO] |  +- org.apache.flink:flink-runtime_2.12:jar:1.13.0:compile
[INFO] |  |  +-
org.apache.flink:flink-queryable-state-client-java:jar:1.13.0:compile
[INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.13.0:compile
[INFO] |  |  +- commons-io:commons-io:jar:2.7:compile
[INFO] |  |  +-
org.apache.flink:flink-shaded-netty:jar:4.1.49.Final-13.0:compile
[INFO] |  |  +-
org.apache.flink:flink-shaded-jackson:jar:2.12.1-13.0:compile
[INFO] |  |  +-
org.apache.flink:flink-shaded-zookeeper-3:jar:3.4.14-13.0:compile
[INFO] |  |  +- org.javassist:javassist:jar:3.24.0-GA:compile
[INFO] |  |  +- org.scala-lang:scala-library:jar:2.12.7:compile

Can anyone suggest if flink app is vulnerable to this or can safely be
ignored?

Thanks


Re: owasp-dependency-check is flagging flink 1.13 for scala 2.12.7

2021-07-03 Thread Debraj Manna
Thanks for replying.

But I am also observing the following being flagged

*flink-hadoop-fs-1.13.1*

   - *CVE-2016-5001
   <http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2016-5001>*
   - *CVE-2017-3161
   <http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2017-3161>*
   - *CVE-2017-3162
   <http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2017-3162>*

*flink-connector-kafka_2.12-1.13.1*

   - *CVE-2018-17196
   <http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2018-17196>*




On Fri, Jul 2, 2021 at 7:19 PM Chesnay Schepler  wrote:

> Its unlikely to be relevant for you since the vulnerability only affects
> the scaladocs, i.e., documentation.
>
> On 7/2/2021 2:10 PM, Debraj Manna wrote:
>
> Hi,
>
> I was running owasp-dependency-check
> <https://owasp.org/www-project-dependency-check/> in a java application
> based on flink-1.13.0 (scala 2.12). scala 2.12.7 was getting flagged for
> this
> <https://ossindex.sonatype.org/vulnerability/bd61dd10-4348-45cd-a09e-094e9d588715?component-type=maven&component-name=org.scala-lang.scala-library&utm_source=dependency-check&utm_medium=integration&utm_content=6.1.6>.
>
>
> Relevant Dependency for this -
>
> FO] +- org.apache.flink:flink-streaming-java_2.12:jar:1.13.0:provided
> [INFO] |  +- org.apache.flink:flink-file-sink-common:jar:1.13.0:provided
> [INFO] |  +- org.apache.flink:flink-runtime_2.12:jar:1.13.0:compile
> [INFO] |  |  +-
> org.apache.flink:flink-queryable-state-client-java:jar:1.13.0:compile
> [INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.13.0:compile
> [INFO] |  |  +- commons-io:commons-io:jar:2.7:compile
> [INFO] |  |  +-
> org.apache.flink:flink-shaded-netty:jar:4.1.49.Final-13.0:compile
> [INFO] |  |  +-
> org.apache.flink:flink-shaded-jackson:jar:2.12.1-13.0:compile
> [INFO] |  |  +-
> org.apache.flink:flink-shaded-zookeeper-3:jar:3.4.14-13.0:compile
> [INFO] |  |  +- org.javassist:javassist:jar:3.24.0-GA:compile
> [INFO] |  |  +- org.scala-lang:scala-library:jar:2.12.7:compile
>
> Can anyone suggest if flink app is vulnerable to this or can safely be
> ignored?
>
> Thanks
>
>
>


Re: owasp-dependency-check is flagging flink 1.13 for scala 2.12.7

2021-07-03 Thread Debraj Manna
Thanks again for replying.

Can you please provide a bit more explanation about the flink-hadoop-fs? It
is coming from flink-streaming. The relevant dependency tree looks like
below. How can I use a different version of hadoop in this case?

+- org.apache.flink:flink-streaming-java_2.12:jar:1.13.1:provided
[INFO] |  +- org.apache.flink:flink-file-sink-common:jar:1.13.1:provided
[INFO] |  +- org.apache.flink:flink-runtime_2.12:jar:1.13.1:compile
[INFO] |  |  +-
org.apache.flink:flink-queryable-state-client-java:jar:1.13.1:compile
[INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.13.1:compile
[INFO] |  |  +- commons-io:commons-io:jar:2.7:compile
[INFO]



On Sun, Jul 4, 2021 at 1:29 AM Chesnay Schepler  wrote:

> The Kafka one is incorrect because the 1.13.1 connector relies on Kafka
> 2.4.1.
>
> Whether the hadoop-fs ones are relevant for you depends entirely on which
> Hadoop version you are using, because we expect the user to provide Hadoop
> (and you can use later and more secure versions if you wish). IOW, the
> Hadoop 2.4 dependency in flink-hadoop-fs is just a hint to the user that
> this version _can_ be used.
>
> On 7/3/2021 8:03 PM, Debraj Manna wrote:
>
> Thanks for replying.
>
> But I am also observing the following being flagged
>
> *flink-hadoop-fs-1.13.1*
>
>- *CVE-2016-5001
><http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2016-5001>*
>- *CVE-2017-3161
><http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2017-3161>*
>- *CVE-2017-3162
><http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2017-3162>*
>
> *flink-connector-kafka_2.12-1.13.1*
>
>- *CVE-2018-17196
><http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2018-17196>*
>
>
>
>
> On Fri, Jul 2, 2021 at 7:19 PM Chesnay Schepler 
> wrote:
>
>> Its unlikely to be relevant for you since the vulnerability only affects
>> the scaladocs, i.e., documentation.
>>
>> On 7/2/2021 2:10 PM, Debraj Manna wrote:
>>
>> Hi,
>>
>> I was running owasp-dependency-check
>> <https://owasp.org/www-project-dependency-check/> in a java application
>> based on flink-1.13.0 (scala 2.12). scala 2.12.7 was getting flagged for
>> this
>> <https://ossindex.sonatype.org/vulnerability/bd61dd10-4348-45cd-a09e-094e9d588715?component-type=maven&component-name=org.scala-lang.scala-library&utm_source=dependency-check&utm_medium=integration&utm_content=6.1.6>.
>>
>>
>> Relevant Dependency for this -
>>
>> FO] +- org.apache.flink:flink-streaming-java_2.12:jar:1.13.0:provided
>> [INFO] |  +- org.apache.flink:flink-file-sink-common:jar:1.13.0:provided
>> [INFO] |  +- org.apache.flink:flink-runtime_2.12:jar:1.13.0:compile
>> [INFO] |  |  +-
>> org.apache.flink:flink-queryable-state-client-java:jar:1.13.0:compile
>> [INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.13.0:compile
>> [INFO] |  |  +- commons-io:commons-io:jar:2.7:compile
>> [INFO] |  |  +-
>> org.apache.flink:flink-shaded-netty:jar:4.1.49.Final-13.0:compile
>> [INFO] |  |  +-
>> org.apache.flink:flink-shaded-jackson:jar:2.12.1-13.0:compile
>> [INFO] |  |  +-
>> org.apache.flink:flink-shaded-zookeeper-3:jar:3.4.14-13.0:compile
>> [INFO] |  |  +- org.javassist:javassist:jar:3.24.0-GA:compile
>> [INFO] |  |  +- org.scala-lang:scala-library:jar:2.12.7:compile
>>
>> Can anyone suggest if flink app is vulnerable to this or can safely be
>> ignored?
>>
>> Thanks
>>
>>
>>
>


Flink 1.13.1 PartitionNotFoundException

2021-07-14 Thread Debraj Manna
Hi

I am observing my flink jobs is failing with the below error

2021-07-14T12:07:00.918Z INFO runtime.executiongraph.Execution
flink-akka.actor.default-dispatcher-29 transitionState:1446
MetricAggregateFunction -> (Sink: LateMetricSink10, Sink: TSDBSink9)
(12/30) (3489393394c13fd1ad85136e11d67deb) switched from RUNNING to FAILED
on container_e12_1626252668574_0001_01_05 @ platform4 (dataPort=34313).

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition
73b5c8ee0c6db91027ce081379f64669#6@7c77e83642e1b954a91dd7b49cf3043b not
found.

at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:280)

at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:198)

at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:515)

at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:845)

at
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)

at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Same has been discussed in the old link. But in my case there is hardly any
load in the cluster. Can someone suggest what can cause this error?


http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PartitionNotFoundException-when-running-in-yarn-session-td16081.html


Thanks,


Re: Flink 1.13.1 PartitionNotFoundException

2021-07-14 Thread Debraj Manna
Yes I forgot to mention in my first email. I have tried increasing
taskmanager.network.request-backoff.max to
3 in flink-conf.yaml. But I am getting the same error.

On Wed, Jul 14, 2021 at 7:10 PM Timo Walther  wrote:

> Hi Debraj,
>
> I could find quite a few older emails that were suggesting to play
> around with the `taskmanager.network.request-backoff.max` option. This
> was also recomended in the link that you shared. Have you tried it?
>
> Here is some background:
>
>
> http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Network-PartitionNotFoundException-when-run-on-multi-nodes-td21683.html
>
> https://issues.apache.org/jira/browse/FLINK-9413
>
> Let us know if it helped.
>
> Regards,
> Timo
>
>
>
> On 14.07.21 14:51, Debraj Manna wrote:
> > Hi
> >
> > I am observing my flink jobs is failing with the below error
> >
> > 2021-07-14T12:07:00.918Z INFO runtime.executiongraph.Execution
> > flink-akka.actor.default-dispatcher-29 transitionState:1446
> > MetricAggregateFunction -> (Sink: LateMetricSink10, Sink: TSDBSink9)
> > (12/30) (3489393394c13fd1ad85136e11d67deb) switched from RUNNING to
> > FAILED on container_e12_1626252668574_0001_01_05 @ platform4
> > (dataPort=34313).
> >
> > org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition
> > 73b5c8ee0c6db91027ce081379f64669#6@7c77e83642e1b954a91dd7b49cf3043b not
> > found.
> >
> > at
> > org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:280)
> >
> > at
> > org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:198)
> >
> > at
> > org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:515)
> >
> > at
> > org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:845)
> >
> > at
> >
> java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
> >
> > at
> >
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> >
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> >
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
> >
> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >
> > Same has been discussed in the old link. But in my case there is hardly
> > any load in the cluster. Can someone suggest what can cause this error?
> >
> >
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PartitionNotFoundException-when-running-in-yarn-session-td16081.html
> > <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PartitionNotFoundException-when-running-in-yarn-session-td16081.html
> >
> >
> >
> > Thanks,
> >
> >
>
>


Re: Flink 1.13.1 PartitionNotFoundException

2021-07-14 Thread Debraj Manna
I have increased it to 9 and seems to be running fine. If I see the
failure still when I add some load I will post back in this thread.

On Wed, Jul 14, 2021 at 7:19 PM Debraj Manna 
wrote:

> Yes I forgot to mention in my first email. I have tried increasing 
> taskmanager.network.request-backoff.max to
> 3 in flink-conf.yaml. But I am getting the same error.
>
> On Wed, Jul 14, 2021 at 7:10 PM Timo Walther  wrote:
>
>> Hi Debraj,
>>
>> I could find quite a few older emails that were suggesting to play
>> around with the `taskmanager.network.request-backoff.max` option. This
>> was also recomended in the link that you shared. Have you tried it?
>>
>> Here is some background:
>>
>>
>> http://deprecated-apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Network-PartitionNotFoundException-when-run-on-multi-nodes-td21683.html
>>
>> https://issues.apache.org/jira/browse/FLINK-9413
>>
>> Let us know if it helped.
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 14.07.21 14:51, Debraj Manna wrote:
>> > Hi
>> >
>> > I am observing my flink jobs is failing with the below error
>> >
>> > 2021-07-14T12:07:00.918Z INFO runtime.executiongraph.Execution
>> > flink-akka.actor.default-dispatcher-29 transitionState:1446
>> > MetricAggregateFunction -> (Sink: LateMetricSink10, Sink: TSDBSink9)
>> > (12/30) (3489393394c13fd1ad85136e11d67deb) switched from RUNNING to
>> > FAILED on container_e12_1626252668574_0001_01_05 @ platform4
>> > (dataPort=34313).
>> >
>> > org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>> Partition
>> > 73b5c8ee0c6db91027ce081379f64669#6@7c77e83642e1b954a91dd7b49cf3043b
>> not
>> > found.
>> >
>> > at
>> > org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:280)
>> >
>> > at
>> > org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:198)
>> >
>> > at
>> > org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:515)
>> >
>> > at
>> > org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:845)
>> >
>> > at
>> >
>> java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
>> >
>> > at
>> >
>> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>> >
>> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> >
>> > at
>> >
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
>> >
>> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >
>> > at
>> >
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >
>> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >
>> > at
>> >
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> >
>> > Same has been discussed in the old link. But in my case there is hardly
>> > any load in the cluster. Can someone suggest what can cause this error?
>> >
>> >
>> >
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PartitionNotFoundException-when-running-in-yarn-session-td16081.html
>> > <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PartitionNotFoundException-when-running-in-yarn-session-td16081.html
>> >
>> >
>> >
>> > Thanks,
>> >
>> >
>>
>>


Flink 1.13.1 - Vulnerabilities CVE-2019-12900 for librocksdbjni

2021-07-16 Thread Debraj Manna
Hi

I am observing flink-1.13.1 is being flagged for CVE-2019-12900 for
librocksdbjni

The issue seems to have been fixed in RocksDB in

https://github.com/facebook/rocksdb/issues/6703.

Can someone let me know if flink is affected by this?

I see this has been discussed a bit on
https://issues.apache.org/jira/browse/FLINK-21411 but it is still open.

Thanks,


Re: Flink 1.13.1 - Vulnerabilities CVE-2019-12900 for librocksdbjni

2021-07-17 Thread Debraj Manna
Anyone any thoughts on this?

On Fri, 16 Jul 2021, 15:22 Debraj Manna,  wrote:

> Hi
>
> I am observing flink-1.13.1 is being flagged for CVE-2019-12900 for
> librocksdbjni
>
> The issue seems to have been fixed in RocksDB in
>
> https://github.com/facebook/rocksdb/issues/6703.
>
> Can someone let me know if flink is affected by this?
>
> I see this has been discussed a bit on
> https://issues.apache.org/jira/browse/FLINK-21411 but it is still open.
>
> Thanks,
>
>


How to use RichAsyncFunction with Test harness in flink 1.13.1?

2021-08-04 Thread Debraj Manna
HI

I am trying to use RichAsyncFunction with flink's test harness. My code
looks like below

final MetadataEnrichment.AsyncFlowLookup fn = new
MetadataEnrichment.AsyncFlowLookup();
final AsyncWaitOperatorFactory> operator = new AsyncWaitOperatorFactory<>(fn, 2000,
1, AsyncDataStream.OutputMode.ORDERED);
final OneInputStreamOperatorTestHarness> harness = new
OneInputStreamOperatorTestHarness<>(operator);
Configuration config = new Configuration();
config.set(StoreOptions.CONFIG_STORE_TYPE,
ConfigStoreFactory.StoreType.MEMORY.name());
harness.getExecutionConfig().setGlobalJobParameters(config);
harness.getExecutionConfig().registerKryoType(GenericMetric.class);
harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class);
harness.open();

But harness.open() is throwing the below exception

java.lang.IllegalStateException
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72)
at 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436)
at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634)

Can someone suggest what could be going wrong?


Re: How to use RichAsyncFunction with Test harness in flink 1.13.1?

2021-08-06 Thread Debraj Manna
Thanks it worked.

Can you please let me know what are you referring to as user function and
why it is not recommended to not use harness with it?

On Wed, 4 Aug 2021, 22:43 Arvid Heise,  wrote:

> I would strongly recommend not using the harness for testing user
> functions.
>
> Instead I'd just create an ITCase like this:
>
> @ClassRule
> public static final MiniClusterWithClientResource MINI_CLUSTER =
> new MiniClusterWithClientResource(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(PARALLELISM)
> .build());
>
> // 
>
> @Test
> public void testAsyncFunction() throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(PARALLELISM);
>
> final DataStream stream = env.fromSequence(1L, 1_000L); // or 
> fromElements
> stream = AsyncDataStream.orderedWait(stream, ...);
>
> final List result = stream.executeAndCollect(1);
> assertThat(result, containsInAnyOrder(LongStream.rangeClosed(1, 
> 1000).boxed().toArray()));
> }
>
> See also
> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L62-L62
>
> On Wed, Aug 4, 2021 at 7:00 PM Debraj Manna 
> wrote:
>
>> HI
>>
>> I am trying to use RichAsyncFunction with flink's test harness. My code
>> looks like below
>>
>> final MetadataEnrichment.AsyncFlowLookup fn = new 
>> MetadataEnrichment.AsyncFlowLookup();
>> final AsyncWaitOperatorFactory> EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, 
>> AsyncDataStream.OutputMode.ORDERED);
>> final OneInputStreamOperatorTestHarness> EnrichedMinTuple>> harness = new 
>> OneInputStreamOperatorTestHarness<>(operator);
>> Configuration config = new Configuration();
>> config.set(StoreOptions.CONFIG_STORE_TYPE, 
>> ConfigStoreFactory.StoreType.MEMORY.name());
>> harness.getExecutionConfig().setGlobalJobParameters(config);
>> harness.getExecutionConfig().registerKryoType(GenericMetric.class);
>> harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class);
>> harness.open();
>>
>> But harness.open() is throwing the below exception
>>
>> java.lang.IllegalStateException
>>  at 
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290)
>>  at 
>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280)
>>  at 
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144)
>>  at 
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80)
>>  at 
>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398)
>>  at 
>> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180)
>>  at 
>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385)
>>  at 
>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540)
>>  at 
>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428)
>>  at 
>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436)
>>  at 
>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634)
>>
>> Can someone suggest what could be going wrong?
>>
>>
>>


Re: How to use RichAsyncFunction with Test harness in flink 1.13.1?

2021-08-06 Thread Debraj Manna
Thanks again for the explanation.

On Fri, 6 Aug 2021, 15:39 Arvid Heise,  wrote:

> User function from the point of view of Flink is any function that you as
> a user write. In this case, MetadataEnrichment.AsyncFlowLookup is a
> user-function of the Async Operator.
>
> I don't recommend harness for several reasons:
> - It's not Public API, we will adjust it and we will introduce breaking
> changes with each minor release.
> - You need to know internal implementation details, especially with the
> threading model, to invoke the harness methods in the correct order. We
> will change internals and your test will stop working in newer Flink
> versions.
> - Harness is meant as a way for Flink devs to perform unit tests of
> operators or parts thereof. A unit test for user-defined function should
> not use any Flink classes (e.g. you really just test the logic of some
> methods of your AsyncFlowLookup). If you want to test how it interacts with
> Flink, you get an integration test by definition.
>
>
>
> On Fri, Aug 6, 2021 at 10:11 AM Debraj Manna 
> wrote:
>
>> Thanks it worked.
>>
>> Can you please let me know what are you referring to as user function and
>> why it is not recommended to not use harness with it?
>>
>> On Wed, 4 Aug 2021, 22:43 Arvid Heise,  wrote:
>>
>>> I would strongly recommend not using the harness for testing user
>>> functions.
>>>
>>> Instead I'd just create an ITCase like this:
>>>
>>> @ClassRule
>>> public static final MiniClusterWithClientResource MINI_CLUSTER =
>>> new MiniClusterWithClientResource(
>>> new MiniClusterResourceConfiguration.Builder()
>>> .setNumberTaskManagers(1)
>>> .setNumberSlotsPerTaskManager(PARALLELISM)
>>> .build());
>>>
>>> // 
>>>
>>> @Test
>>> public void testAsyncFunction() throws Exception {
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setParallelism(PARALLELISM);
>>>
>>> final DataStream stream = env.fromSequence(1L, 1_000L); // or 
>>> fromElements
>>> stream = AsyncDataStream.orderedWait(stream, ...);
>>>
>>> final List result = stream.executeAndCollect(1);
>>> assertThat(result, containsInAnyOrder(LongStream.rangeClosed(1, 
>>> 1000).boxed().toArray()));
>>> }
>>>
>>> See also
>>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L62-L62
>>>
>>> On Wed, Aug 4, 2021 at 7:00 PM Debraj Manna 
>>> wrote:
>>>
>>>> HI
>>>>
>>>> I am trying to use RichAsyncFunction with flink's test harness. My code
>>>> looks like below
>>>>
>>>> final MetadataEnrichment.AsyncFlowLookup fn = new 
>>>> MetadataEnrichment.AsyncFlowLookup();
>>>> final AsyncWaitOperatorFactory>>> EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, 
>>>> AsyncDataStream.OutputMode.ORDERED);
>>>> final OneInputStreamOperatorTestHarness>>> Tuple2> harness = new 
>>>> OneInputStreamOperatorTestHarness<>(operator);
>>>> Configuration config = new Configuration();
>>>> config.set(StoreOptions.CONFIG_STORE_TYPE, 
>>>> ConfigStoreFactory.StoreType.MEMORY.name());
>>>> harness.getExecutionConfig().setGlobalJobParameters(config);
>>>> harness.getExecutionConfig().registerKryoType(GenericMetric.class);
>>>> harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class);
>>>> harness.open();
>>>>
>>>> But harness.open() is throwing the below exception
>>>>
>>>> java.lang.IllegalStateException
>>>>at 
>>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
>>>>at 
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290)
>>>>at 
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280)
>>>>at 
>>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144)
>>>>at 
>>>> org.apache.flink.streaming

Flink 1.13.1 Kafka Producer Error

2021-08-23 Thread Debraj Manna
I am trying to use flink kafka producer like below

public static FlinkKafkaProducer createProducer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());

return new FlinkKafkaProducer<>(
"FlinkSdmKafkaTopic",
new SerializationSchema("FlinkSdmKafkaTopic", 8),
props,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

private static class SerializationSchema implements
KafkaSerializationSchema {
final String topic;
final int numPartitions;

public SerializationSchema(final String topic, final int numPartitions) {
this.topic = topic;
this.numPartitions = numPartitions;
}

@Override
public ProducerRecord
serialize(SelfDescribingMessageDO sdm, @Nullable Long aLong) {
return new ProducerRecord<>(topic,
KafkaPublisher.getPartitionId(sdm.getHashKey(), numPartitions),
sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
sdm.toByteArray());
}
}

I am getting the below exception when trying to deploy the flink job.
During unit tests I am not getting this error.

2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source:
MetricSource -> Filter -> MetricStoreMapper -> (Filter ->
Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11,
Sink: TSDBSink14) (5/8)#0 transitionState:1069 Source: MetricSource ->
Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map
-> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0
(5764a387ede7d6710bcf3ad4e248) switched from INITIALIZING to
FAILED with failure cause: org.apache.kafka.common.KafkaException:
Failed to construct kafka producer
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an
instance of org.apache.kafka.common.serialization.S

Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Debraj Manna
The same query has been asked in stackoverflow
<https://stackoverflow.com/questions/68895934/flink-1-13-1-kafka-producer-error-bytearrayserializer-is-not-an-instance-of-org>
also. Another related question
<https://stackoverflow.com/questions/62466188/flink-kafka-exactly-once-causing-kafkaexception-bytearrayserializer-is-not-an-in>
on Stackoverflow. Does anyone have any suggestions?

On Mon, Aug 23, 2021 at 9:07 PM Debraj Manna 
wrote:

> I am trying to use flink kafka producer like below
>
> public static FlinkKafkaProducer createProducer()
> {
> Properties props = new Properties();
> props.setProperty("bootstrap.servers", "");
> props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> ByteArraySerializer.class.getName());
> props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> ByteArraySerializer.class.getName());
>
> return new FlinkKafkaProducer<>(
> "FlinkSdmKafkaTopic",
> new SerializationSchema("FlinkSdmKafkaTopic", 8),
> props,
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> }
>
> private static class SerializationSchema implements 
> KafkaSerializationSchema {
> final String topic;
> final int numPartitions;
>
> public SerializationSchema(final String topic, final int numPartitions) {
> this.topic = topic;
> this.numPartitions = numPartitions;
> }
>
> @Override
> public ProducerRecord serialize(SelfDescribingMessageDO 
> sdm, @Nullable Long aLong) {
> return new ProducerRecord<>(topic,
> KafkaPublisher.getPartitionId(sdm.getHashKey(), 
> numPartitions),
> sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
> sdm.toByteArray());
> }
> }
>
> I am getting the below exception when trying to deploy the flink job. During 
> unit tests I am not getting this error.
>
> 2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source: MetricSource 
> -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> 
> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 
> transitionState:1069 Source: MetricSource -> Filter -> MetricStoreMapper -> 
> (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: 
> FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 
> (5764a387ede7d6710bcf3ad4e248) switched from INITIALIZING to FAILED with 
> failure cause: org.apache.kafka.common.KafkaException: Failed to construct 
> kafka producer
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> at 
> org.apache.flink.streaming.ru

Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Debraj Manna
yes I initially did not add
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG` or
`ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
I was getting the same error so tried setting them explicitly.

I did mvn dependency:tree | grep -i kafka. I did not see any other versions
of Kafka in non test dependency and this error I am not getting during test
but only when I am running my flink application.


[INFO] +- org.apache.flink:flink-connector-kafka_2.12:jar:1.13.1:compile
[INFO] |  +- org.apache.kafka:kafka-clients:jar:2.4.1:compile
[INFO] +- org.apache.kafka:kafka_2.12:jar:2.4.1:test
[INFO] +- org.apache.flink:flink-connector-kafka_2.12:test-jar:tests:1.13.1:test
[INFO] +- net.mguenther.kafka:kafka-junit:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:test:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka-clients:jar:test:2.4.0:test
[INFO] |  +- org.apache.kafka:connect-api:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:connect-json:jar:2.4.0:test
[INFO] |  \- org.apache.kafka:connect-runtime:jar:2.4.0:test
[INFO] | +- org.apache.kafka:kafka-tools:jar:2.4.0:test
[INFO] | |  +- org.apache.kafka:kafka-log4j-appender:jar:2.4.0:test
[INFO] | +- org.apache.kafka:connect-transforms:jar:2.4.0:test


On Tue, Aug 24, 2021 at 5:08 PM Fabian Paul 
wrote:

> Hi Debraj,
>
> The error looks indeed strange. We recommend to not set any 
> `ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG`
> or `ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG`
> because the connector will take care of it. Can you try to remove these
> call and check if it makes a difference?
>
> Only looking at the error message it feels like different versions of
> the Kafka dependency are on the class path.
>
> Best,
> Fabian
>


Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Debraj Manna
Fabian

I am running it inside yarn.

Thanks,

On Tue, Aug 24, 2021 at 5:27 PM Fabian Paul 
wrote:

> Hi Debraj
>
> How do you run your application? If you run it from an IDE you can set a
> breakpoint and inspect the serializer class which is used.
>
> Best,
> Fabian


Re: Flink KafkaConsumer metrics in DataDog

2021-08-26 Thread Debraj Manna
Yes we are also facing the same problem and not able to find any solution.

On Thu, Aug 26, 2021 at 5:59 PM Chesnay Schepler  wrote:

> AFAIK this metric is directly forwarded from Kafka as-is; so Flink isn't
> calculating anything.
>
> I suggest to reach out to the Kafka folks.
>
> On 25/08/2021 17:23, Shilpa Shankar wrote:
>
> Hello ,
>
> We have enabled DataDogHTTPReporter to fetch metrics on flink v1.13.1
> running on kubernetes. The metric flink.operator.KafkaConsumer.records_lag_max
> is not displaying accurate values. It also displays 0 most of the time and
> when it does fetch a value, it seems to be wrong when I compare them with
> the Kafka lag broker metrics.
> Could you please let us know how these metrics are calculated? Are there
> any configuration changes that need to be made to support the Kafka
> Consumer metrics?
>
> # Datadog Integration
> metrics.reporter.dghttp.class:
> org.apache.flink.metrics.datadog.DatadogHttpReporter
> metrics.reporter.dghttp.apikey: 
> metrics.reporter.dghttp.maxMetricsPerRequest: 1000
> metrics.reporter.dghttp.tags:
> flink-cluster:flink-noc-cluster,data-center:lab
>
> metrics.scope.jm: flink.jobmanager
> metrics.scope.jm.job: flink.jobmanager.job
> metrics.scope.tm: flink.taskmanager
> metrics.scope.tm.job: flink.taskmanager.job
> metrics.scope.task: flink.task
>
> Thanks,
> Shilpa
>
>
>


Re: Flink KafkaConsumer metrics in DataDog

2021-09-04 Thread Debraj Manna
We disabled kafka consumer metrics in flink via setting
register.consumer.metrics to false in KafkaSource and enabled
kafka consumer integration in datadog agent as explained here
<https://www.datadoghq.com/blog/monitor-kafka-with-datadog/#configure-the-agent>
.

On Fri, Aug 27, 2021 at 8:44 AM Debraj Manna 
wrote:

> Yes we are also facing the same problem and not able to find any solution.
>
> On Thu, Aug 26, 2021 at 5:59 PM Chesnay Schepler 
> wrote:
>
>> AFAIK this metric is directly forwarded from Kafka as-is; so Flink isn't
>> calculating anything.
>>
>> I suggest to reach out to the Kafka folks.
>>
>> On 25/08/2021 17:23, Shilpa Shankar wrote:
>>
>> Hello ,
>>
>> We have enabled DataDogHTTPReporter to fetch metrics on flink v1.13.1
>> running on kubernetes. The metric 
>> flink.operator.KafkaConsumer.records_lag_max
>> is not displaying accurate values. It also displays 0 most of the time and
>> when it does fetch a value, it seems to be wrong when I compare them with
>> the Kafka lag broker metrics.
>> Could you please let us know how these metrics are calculated? Are there
>> any configuration changes that need to be made to support the Kafka
>> Consumer metrics?
>>
>> # Datadog Integration
>> metrics.reporter.dghttp.class:
>> org.apache.flink.metrics.datadog.DatadogHttpReporter
>> metrics.reporter.dghttp.apikey: 
>> metrics.reporter.dghttp.maxMetricsPerRequest: 1000
>> metrics.reporter.dghttp.tags:
>> flink-cluster:flink-noc-cluster,data-center:lab
>>
>> metrics.scope.jm: flink.jobmanager
>> metrics.scope.jm.job: flink.jobmanager.job
>> metrics.scope.tm: flink.taskmanager
>> metrics.scope.tm.job: flink.taskmanager.job
>> metrics.scope.task: flink.task
>>
>> Thanks,
>> Shilpa
>>
>>
>>


Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-21 Thread Debraj Manna
Any idea when can we expect
https://issues.apache.org/jira/browse/FLINK-25375 to be released?

On Mon, Dec 20, 2021 at 8:18 PM Martijn Visser 
wrote:

> Hi,
>
> The status and Flink ticket for upgrading to Log4j 2.17.0 can be tracked
> at https://issues.apache.org/jira/browse/FLINK-25375.
>
> Best regards,
>
> Martijn
>
> On Sat, 18 Dec 2021 at 16:50, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com> wrote:
>
>> Hi,
>>
>>
>>
>> It seems there is high severity vulnerability in log4j 2.16.0.(
>> CVE-2021-45105
>> )
>>
>> Refer : https://logging.apache.org/log4j/2.x/security.html
>>
>> Any update on this please?
>>
>>
>>
>> Regards,
>>
>> Suchithra
>>
>>
>>
>> *From:* Chesnay Schepler 
>> *Sent:* Thursday, December 16, 2021 4:35 PM
>> *To:* Parag Somani 
>> *Cc:* Michael Guterl ; V N, Suchithra (Nokia -
>> IN/Bangalore) ; Richard Deurwaarder <
>> rich...@xeli.eu>; user 
>> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>>
>>
>>
>> We will announce the releases when the binaries are available.
>>
>>
>>
>> On 16/12/2021 05:37, Parag Somani wrote:
>>
>> Thank you Chesnay for expediting this fix...!
>>
>>
>>
>> Can you suggest, when can I get binaries for 1.14.2 flink version?
>>
>>
>>
>> On Thu, Dec 16, 2021 at 5:52 AM Chesnay Schepler 
>> wrote:
>>
>> We will push docker images for all new releases, yes.
>>
>>
>>
>> On 16/12/2021 01:16, Michael Guterl wrote:
>>
>> Will you all be pushing Docker images for the 1.11.6 release?
>>
>>
>>
>> On Wed, Dec 15, 2021 at 3:26 AM Chesnay Schepler 
>> wrote:
>>
>> The current ETA is 40h for an official announcement.
>>
>> We are validating the release today (concludes in 16h), publish it
>> tonight, then wait for mirrors to be sync (about a day), then we announce
>> it.
>>
>>
>>
>> On 15/12/2021 12:08, V N, Suchithra (Nokia - IN/Bangalore) wrote:
>>
>> Hello,
>>
>>
>>
>> Could you please tell when we can expect Flink 1.12.7 release? We are
>> waiting for the CVE fix.
>>
>>
>>
>> Regards,
>>
>> Suchithra
>>
>>
>>
>>
>>
>> *From:* Chesnay Schepler  
>> *Sent:* Wednesday, December 15, 2021 4:04 PM
>> *To:* Richard Deurwaarder  
>> *Cc:* user  
>> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>>
>>
>>
>> We will also update the docker images.
>>
>>
>>
>> On 15/12/2021 11:29, Richard Deurwaarder wrote:
>>
>> Thanks for picking this up quickly!
>>
>>
>>
>> I saw you've made a second minor upgrade to upgrade to log4j2 2.16 which
>> is perfect.
>>
>>
>>
>> Just to clarify: Will you also push new docker images for these releases
>> as well? In particular flink 1.11.6 (Sorry we must upgrade soon! :()
>>
>>
>>
>> On Tue, Dec 14, 2021 at 2:33 AM narasimha  wrote:
>>
>> Thanks TImo, that was helpful.
>>
>>
>>
>> On Mon, Dec 13, 2021 at 7:19 PM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>> Chesnay Thank you for the clarification.
>>
>>
>>
>> On Mon, Dec 13, 2021 at 6:55 PM Chesnay Schepler 
>> wrote:
>>
>> The flink-shaded-zookeeper jars do not contain log4j.
>>
>>
>>
>> On 13/12/2021 14:11, Prasanna kumar wrote:
>>
>> Does Zookeeper have this vulnerability dependency ? I see references to
>> log4j in Shaded Zookeeper jar included as part of the flink distribution.
>>
>>
>>
>> On Mon, Dec 13, 2021 at 1:40 PM Timo Walther  wrote:
>>
>> While we are working to upgrade the affected dependencies of all
>> components, we recommend users follow the advisory of the Apache Log4j
>> Community. Also Ververica platform can be patched with a similar approach:
>>
>> To configure the JVMs used by Ververica Platform, you can pass custom
>> Java options via the JAVA_TOOL_OPTIONS environment variable. Add the
>> following to your platform values.yaml, or append to the existing value
>> of JAVA_TOOL_OPTIONS if you are using it already there, then redeploy
>> the platform with Helm:
>> env:
>>- name: JAVA_TOOL_OPTIONS
>>  value: -Dlog4j2.formatMsgNoLookups=true
>>
>>
>> For any questions, please contact us via our support portal.
>>
>> Regards,
>> Timo
>>
>> On 11.12.21 06:45, narasimha wrote:
>> > Folks, what about the veverica platform. Is there any
>> mitigation around it?
>> >
>> > On Fri, Dec 10, 2021 at 3:32 PM Chesnay Schepler > > > wrote:
>> >
>> > I would recommend to modify your log4j configurations to set
>> > log4j2.formatMsgNoLookups to true/./
>> > /
>> > /
>> > As far as I can tell this is equivalent to upgrading log4j, which
>> > just disabled this lookup by default.
>> > /
>> > /
>> > On 10/12/2021 10:21, Richard Deurwaarder wrote:
>> >> Hello,
>> >>
>> >> There has been a log4j2 vulnerability made public
>> >> https://www.randori.com/blog/cve-2021-44228/
>> >>  which is making
>> >> some waves :)
>> >> This post even explicitly mentions Apache Flink:
>> >>
>> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert