Re: Windows on SinkFunctions

2020-03-29 Thread Sidney Feiner
Thanks!
What am I supposed to put in the apply/process function for the sink to be 
invoked on a List of items?


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



From: tison 
Sent: Sunday, March 22, 2020 4:19 PM
To: Sidney Feiner 
Cc: user@flink.apache.org 
Subject: Re: Windows on SinkFunctions

Hi Sidney,

For the case, you can exactly write

stream.
  ...
  .window()
  .apply()
  .addSink()

Operator chain will chain these operators into one so that you don't have to 
worry about the efficiency.

Best,
tison.


Sidney Feiner mailto:sidney.fei...@startapp.com>> 
于2020年3月22日周日 下午10:03写道:
Hey,
I wanted to know if it's possible to define a SinkFunction as a WindowFunction 
as well.
For example, I would like the sink to be invoked every 5 minute or once 500 
events reached the sink.
Is there a way to do this inside the sink implementation? Or do I have to 
create the windows prior in the pipeline?
Because if I have multiple sinks that that only for one of them I need a 
Window, the second solution might be problematic.

Thanks :)


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]



Re: Kafka - FLink - MongoDB using Scala

2020-03-29 Thread Konstantin Knauf
cc user@f.a.o

Hi Siva,

I am not aware of a Flink MongoDB Connector in either Apache Flink, Apache
Bahir or flink-packages.org. I assume that you are doing idempotent
upserts, and hence do not require a transactional sink to achieve
end-to-end exactly-once results.

To build one yourself, you implement
org.apache.flink.streaming.api.functions.sink.SinkFunction (better inherit
from org.apache.flink.streaming.api.functions.sink.RichSinkFunction).
Roughly speaking, you would instantiate the MongoDB client in the "open"
method and write records in the MongoDB client. Usually, such sinks us some
kind of batching to increase write performance.

I suggest you also have a look at the source code of the ElasticSearch or
Cassandra Sink.

Best,

Konstantin

On Sat, Mar 28, 2020 at 1:47 PM Sivapragash Krishnan <
sivapragas...@gmail.com> wrote:

> Hi
>
> I'm working on creating a streaming pipeline which streams data from Kafka
> and stores in MongoDB using Flink scala.
>
> I'm able to successfully stream data from Kafka using FLink Scala. I'm not
> finding any support to store the data into MongoDB, could you please help
> me with the code snippet to store data into MongoDB.
>
> Thanks
> Siva
>


-- 

Konstantin Knauf | Head of Product

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: Testing RichAsyncFunction with TestHarness

2020-03-29 Thread KristoffSC
Hi, 
another update on this one. 
I managed to make the workaround a little bit cleaner. 

The test setup I have now is like this:

ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream();
ObjectOutputStream oosStreamEdges = new
ObjectOutputStream(streamEdgesBytes);
oosStreamEdges.writeObject(Collections.emptyList());

KryoSerializer kryoSerializer = new KryoSerializer<>(
MyMessage.class, executionConfig);
ByteArrayOutputStream kryoSerializerBytes = new ByteArrayOutputStream();
ObjectOutputStream oosKryoSerializer = new
ObjectOutputStream(kryoSerializerBytes);
oosKryoSerializer.writeObject(kryoSerializer);

Configuration configuration = new Configuration();
configuration.setBytes("edgesInOrder", streamEdgesBytes.toByteArray());
configuration.setBytes("typeSerializer_in_1",
kryoSerializerBytes.toByteArray());

MockEnvironment environment = MockEnvironment.builder().build();
ExecutionConfig executionConfig = environment.getExecutionConfig();
environment.getTaskConfiguration().addAll(configuration);

this.testHarness = new OneInputStreamOperatorTestHarness<>(
new AsyncWaitOperator<>(processFunction, 2000, 1,
OutputMode.UNORDERED), environment);

With this setup, this.testHarness.open(); works. 
However there is another problem, 
When calling:
testHarness.processElement(myMessage, 1L); 
it throws another exception:

java.lang.AssertionError
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:400)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228)
at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:112)
at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:107)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[ANNOUNCE] Weekly Community Update 2020/13

2020-03-29 Thread Konstantin Knauf
Dear community,

happy to share this week's Apache Flink community digest with a couple of
threads around the upcoming release of Apache Flink Stateful Functions 2.0,
an update on Flink 1.10.1, two FLIPs to improve Apache
Flink's distributed runtime and the schedule for Flink Forward Virtual
Conference 2020.

Flink Development
==

* [statefun] A lot has happened this week around the release of Apache
Flink Stateful Functions 2.0.
   * [releases] Gordon announced the feature freeze for Apache Flink
Stateful Functions 2.0.0 last Monday. [1]
   * [releases] Since then he has published three release candidates. The
latest was published today and voting is open for at least 72 hours. [2]
   * [docker] Gordon proposes to create a separate (Apache Flink)
repository for the Stateful Functions Dockerfiles. Only positive feedback
so far. [3]
   * [docs] The documentation has been migrated to Apache infrastructure
and is now available under [4]. In conjunction with this release the
community will also add a dedicated page on Stateful Functions to the
Apache Flink homepage and redirect to it from statefun.io.

* [releases] Yu Lli has published an updated list of blockers and critical
issues for the upcoming 1.10.1 release. There are 4 blockers, 2 critical
issues left. [5]

* [development process] Robert has started a discussion on phasing
our Travis CI usage in Apache Flink in favor of Azure Pipelines. Currently
both are used. Robert proposes that Travis would still be used for Flink
1.10--, but all Travis-specific files and configuration would be removed
from master. [6]

* [distributed runtime] Yangze Guo has created a FLIP to improve the way
Flink assigns identifiers to graph components as well as its distributed
components to facilitate debugging. The general idea is to give IDs more
meaning (e.g. ResourceID = Pod Name on Kubernetes) and to derive IDs from
each other wherever it makes sense (e.g. ExecutionAttemptID = ExecutionID +
attempt counter). [7,8]

* [distributed runtime] FLIP-119 Based on the previous work to make Flink's
scheduler extensible, Gary proposes to introduce Pipelined Region
Scheduling. The basic idea is to always schedule pipelined regions of Flink
Job together once (earliest) all its inputs are ready. This avoids resource
deadlocks for batch jobs and makes batch jobs more tunable (introduce many
blocking exchanges -> smaller pipelined regions -> fewer slots required)
and allows to unify scheduling for streaming and batch jobs. [9,10]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Apache-Flink-Stateful-Functions-2-0-0-tp39163.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-Release-2-0-0-release-candidate-3-tp39424.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Creating-a-new-repo-to-host-Stateful-Functions-Dockerfiles-tp39342.html
[4] https://ci.apache.org/projects/flink/flink-statefun-docs-master/
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-tp38689.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Switch-to-Azure-Pipelines-as-the-primary-CI-tool-switch-off-Travis-tp39177.html
[7]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-118-Improve-Flink-s-ID-system-tp39321.html
[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-119-Pipelined-Region-Scheduling-tp39350.html
[10]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling

Notable Bugs
==

* [FLINK-16638] [1.10.0] [1.9.2] When restoring from a Savepoint, Flink
per-default checks that there is a matching (by OperatorID) operator for
every state contained in the savepoint. In this check Flink currently
ignores user-defined OperatorIDs (#setUidHash). [11]

* [FLINK-16638] [1.10.0] The SET command currently does not work in the SQL
Client for table configurations. [12]

* [FLINK-16705] [1.10.0] In the LocalExecutor there is a race condition
between cluster shutdown once a job finishes and retrieval of the result by
the JobClient. [13]


[11] https://issues.apache.org/jira/browse/FLINK-16638
[12] https://issues.apache.org/jira/browse/FLINK-16822
[13] https://issues.apache.org/jira/browse/FLINK-16705

Events, Blog Posts, Misc
===

* The schedule for Flink Forward Virtual Conference 2020 has been published
[14] with keynotes by Cloudera, DellEMC, Splunk and Ververica. Registration
& attendance [15] is free.

* Datadog has published a blog post on how to monitor Apache Flink with
Datadog. [16]

* Alexander Fedulov has published the second part of his series on dynamic
fraud detection with Apache Flink. [17]

* Bowen Li recaps the motivation of integrating Flink with Hive and gi

Re: End to End Latency Tracking in flink

2020-03-29 Thread Zhijiang
Hi Lu,

Besides Congxian's replies, you can also get some further explanations from 
"https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking";.

Best,
Zhijiang


--
From:Congxian Qiu 
Send Time:2020 Mar. 28 (Sat.) 11:49
To:Lu Niu 
Cc:user 
Subject:Re: End to End Latency Tracking in flink

Hi
As far as I know, the latency-tracking feature is for debugging usages, you can 
use it to debug, and disable it when running the job on production.
From my side, use $current_processing - $event_time is something ok, but keep 
the things in mind: the event time may not be the time ingested in Flink.

Best,
Congxian

Lu Niu  于2020年3月28日周六 上午6:25写道:

Hi,

I am looking for end to end latency monitoring of link job. Based on my study, 
I have two options: 

1. flink provide a latency tracking feature. However, the documentation says it 
cannot show actual latency of business logic as it will bypass all operators. 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking
 Also, the feature can significantly impact the performance so I assume it's 
not for usage in production. What are users use the latency tracking for? 
Sounds like only back pressure could affect the latency.  

2. I found another stackoverflow question on this. 
https://stackoverflow.com/questions/56578919/latency-monitoring-in-flink-application
 . The answer suggestion to expose (current processing - the event time) after 
source and before sink for end to end latency monitoring. Is this a good 
solution? If not, What’s the official solution for end to end latency tracking? 
 

Thank you! 

Best
Lu




Re: Testing RichAsyncFunction with TestHarness

2020-03-29 Thread KristoffSC
HI :) I have finally figured it out :)

On top of changes from last email,
in my test method, I had to wrap "testHarness.processElement" in
synchronized block, like this:

  @Test
 public void foo() throws Exception {
synchronized (this.testHarness.getCheckpointLock()) {
  testHarness.processElement(MyMessage.builder().build(), 1L);
}
  }

That worked. 

I think that this could be added to official documentation in [1]. 


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Third-party Tool] Flink memory calculator

2020-03-29 Thread Yun Tang
Very interesting and convenient tool, just a quick question: could this tool 
also handle deployment cluster commands like "-tm" mixed with configuration in 
`flink-conf.yaml` ?

Best
Yun Tang

From: Yangze Guo 
Sent: Friday, March 27, 2020 18:00
To: user ; user...@flink.apache.org 

Subject: [Third-party Tool] Flink memory calculator

Hi, there.

In release-1.10, the memory setup of task managers has changed a lot.
I would like to provide here a third-party tool to simulate and get
the calculation result of Flink's memory configuration.

 Although there is already a detailed setup guide[1] and migration
guide[2] officially, the calculator could further allow users to:
- Verify if there is any conflict in their configuration. The
calculator is more lightweight than starting a Flink cluster,
especially when running Flink on Yarn/Kubernetes. User could make sure
their configuration is correct locally before deploying it to external
resource managers.
- Get all of the memory configurations before deploying. User may set
taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
But they also want to know the total memory consumption of Flink. With
this tool, users could get all of the memory configurations they are
interested in. If anything is unexpected, they would not need to
re-deploy a Flink cluster.

The repo link of this tool is
https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
BashJavaUtils.jar of Flink and ensures the calculation result is
exactly the same as your Flink dist. For more details, please take a
look at the README.

Any feedback or suggestion is welcomed!

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html

Best,
Yangze Guo


Re: Log file environment variable 'log.file' is not set.

2020-03-29 Thread Yun Tang
Hi Vitaliy

Property of 'log.file' would be configured if you have uploaded 'logback.xml' 
or 'log4j.properties' [1].
The file would contain logs of job manager or task manager which is decided by 
the component itself. And as you can see, this is only a local file path, I am 
afraid this cannot understand hdfs paths.


[1] 
https://github.com/apache/flink/blob/ae3b0ff80b93a83a358ab474060473863d2c30d6/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java#L420

Best
Yun Tang

From: Vitaliy Semochkin 
Sent: Sunday, March 29, 2020 4:32
To: user 
Subject: Log file environment variable 'log.file' is not set.

Hi,

When I launch Flink Application Cluster I keep getting a message
" Log file environment variable 'log.file' is not set."

I use console logging via log4j
and I read logs via yarn logs -applicationId 

What's the purpose of log.file property?
What this file will contain and on which host should I search for the log?
Does this property understands hdfs paths?

Regards,
Vitaliy


Re: Log file environment variable 'log.file' is not set.

2020-03-29 Thread Vitaliy Semochkin
Hello Yun,

I see this error reported by:
*org.apache.flink.runtime.webmonitor.WebMonitorUtils*  - *JobManager log
files are unavailable in the web dashboard. Log file location not found in
environment variable 'log.file' or configuration key 'Key: 'web.log.path' ,
default: null (fallback keys: [{key=jobmanager.web.log.path,
isDeprecated=true}])'.*

I wonder where the JobManager files are stored in case running on a YARN
cluster?
Are these logs same to those I get via yarn logs -applicationId?

Regards,
Vitaliy



On Sun, Mar 29, 2020 at 8:24 PM Yun Tang  wrote:

> Hi Vitaliy
>
> Property of 'log.file' would be configured if you have uploaded
> 'logback.xml' or 'log4j.properties' [1].
> The file would contain logs of job manager or task manager which is
> decided by the component itself. And as you can see, this is only a local
> file path, I am afraid this cannot understand hdfs paths.
>
>
> [1]
> https://github.com/apache/flink/blob/ae3b0ff80b93a83a358ab474060473863d2c30d6/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java#L420
>
> Best
> Yun Tang
> --
> *From:* Vitaliy Semochkin 
> *Sent:* Sunday, March 29, 2020 4:32
> *To:* user 
> *Subject:* Log file environment variable 'log.file' is not set.
>
> Hi,
>
> When I launch Flink Application Cluster I keep getting a message
> " Log file environment variable 'log.file' is not set."
>
> I use console logging via log4j
> and I read logs via yarn logs -applicationId 
>
> What's the purpose of log.file property?
> What this file will contain and on which host should I search for the log?
> Does this property understands hdfs paths?
>
> Regards,
> Vitaliy
>


Re: [Third-party Tool] Flink memory calculator

2020-03-29 Thread Yangze Guo
Hi, Yun,

I'm sorry that it currently could not handle it. But I think it is a
really good idea and that feature would be added to the next version.

Best,
Yangze Guo

On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
>
> Very interesting and convenient tool, just a quick question: could this tool 
> also handle deployment cluster commands like "-tm" mixed with configuration 
> in `flink-conf.yaml` ?
>
> Best
> Yun Tang
> 
> From: Yangze Guo 
> Sent: Friday, March 27, 2020 18:00
> To: user ; user...@flink.apache.org 
> 
> Subject: [Third-party Tool] Flink memory calculator
>
> Hi, there.
>
> In release-1.10, the memory setup of task managers has changed a lot.
> I would like to provide here a third-party tool to simulate and get
> the calculation result of Flink's memory configuration.
>
>  Although there is already a detailed setup guide[1] and migration
> guide[2] officially, the calculator could further allow users to:
> - Verify if there is any conflict in their configuration. The
> calculator is more lightweight than starting a Flink cluster,
> especially when running Flink on Yarn/Kubernetes. User could make sure
> their configuration is correct locally before deploying it to external
> resource managers.
> - Get all of the memory configurations before deploying. User may set
> taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
> But they also want to know the total memory consumption of Flink. With
> this tool, users could get all of the memory configurations they are
> interested in. If anything is unexpected, they would not need to
> re-deploy a Flink cluster.
>
> The repo link of this tool is
> https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
> BashJavaUtils.jar of Flink and ensures the calculation result is
> exactly the same as your Flink dist. For more details, please take a
> look at the README.
>
> Any feedback or suggestion is welcomed!
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html
>
> Best,
> Yangze Guo


Re: [Third-party Tool] Flink memory calculator

2020-03-29 Thread Xintong Song
Thanks Yangze, I've tried the tool and I think its very helpful.


Thank you~

Xintong Song



On Mon, Mar 30, 2020 at 9:40 AM Yangze Guo  wrote:

> Hi, Yun,
>
> I'm sorry that it currently could not handle it. But I think it is a
> really good idea and that feature would be added to the next version.
>
> Best,
> Yangze Guo
>
> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
> >
> > Very interesting and convenient tool, just a quick question: could this
> tool also handle deployment cluster commands like "-tm" mixed with
> configuration in `flink-conf.yaml` ?
> >
> > Best
> > Yun Tang
> > 
> > From: Yangze Guo 
> > Sent: Friday, March 27, 2020 18:00
> > To: user ; user...@flink.apache.org <
> user...@flink.apache.org>
> > Subject: [Third-party Tool] Flink memory calculator
> >
> > Hi, there.
> >
> > In release-1.10, the memory setup of task managers has changed a lot.
> > I would like to provide here a third-party tool to simulate and get
> > the calculation result of Flink's memory configuration.
> >
> >  Although there is already a detailed setup guide[1] and migration
> > guide[2] officially, the calculator could further allow users to:
> > - Verify if there is any conflict in their configuration. The
> > calculator is more lightweight than starting a Flink cluster,
> > especially when running Flink on Yarn/Kubernetes. User could make sure
> > their configuration is correct locally before deploying it to external
> > resource managers.
> > - Get all of the memory configurations before deploying. User may set
> > taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
> > But they also want to know the total memory consumption of Flink. With
> > this tool, users could get all of the memory configurations they are
> > interested in. If anything is unexpected, they would not need to
> > re-deploy a Flink cluster.
> >
> > The repo link of this tool is
> > https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
> > BashJavaUtils.jar of Flink and ensures the calculation result is
> > exactly the same as your Flink dist. For more details, please take a
> > look at the README.
> >
> > Any feedback or suggestion is welcomed!
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html
> >
> > Best,
> > Yangze Guo
>


Re: [Third-party Tool] Flink memory calculator

2020-03-29 Thread Jeff Zhang
Hi Yangze,

Does this tool just parse the configuration in flink-conf.yaml ?  Maybe it
could be done in JobListener [1] (we should enhance it via adding hook
before job submission), so that it could all the cases (e.g. parameters
coming from command line)

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35


Yangze Guo  于2020年3月30日周一 上午9:40写道:

> Hi, Yun,
>
> I'm sorry that it currently could not handle it. But I think it is a
> really good idea and that feature would be added to the next version.
>
> Best,
> Yangze Guo
>
> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
> >
> > Very interesting and convenient tool, just a quick question: could this
> tool also handle deployment cluster commands like "-tm" mixed with
> configuration in `flink-conf.yaml` ?
> >
> > Best
> > Yun Tang
> > 
> > From: Yangze Guo 
> > Sent: Friday, March 27, 2020 18:00
> > To: user ; user...@flink.apache.org <
> user...@flink.apache.org>
> > Subject: [Third-party Tool] Flink memory calculator
> >
> > Hi, there.
> >
> > In release-1.10, the memory setup of task managers has changed a lot.
> > I would like to provide here a third-party tool to simulate and get
> > the calculation result of Flink's memory configuration.
> >
> >  Although there is already a detailed setup guide[1] and migration
> > guide[2] officially, the calculator could further allow users to:
> > - Verify if there is any conflict in their configuration. The
> > calculator is more lightweight than starting a Flink cluster,
> > especially when running Flink on Yarn/Kubernetes. User could make sure
> > their configuration is correct locally before deploying it to external
> > resource managers.
> > - Get all of the memory configurations before deploying. User may set
> > taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
> > But they also want to know the total memory consumption of Flink. With
> > this tool, users could get all of the memory configurations they are
> > interested in. If anything is unexpected, they would not need to
> > re-deploy a Flink cluster.
> >
> > The repo link of this tool is
> > https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
> > BashJavaUtils.jar of Flink and ensures the calculation result is
> > exactly the same as your Flink dist. For more details, please take a
> > look at the README.
> >
> > Any feedback or suggestion is welcomed!
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html
> >
> > Best,
> > Yangze Guo
>


-- 
Best Regards

Jeff Zhang


Re: State & Generics

2020-03-29 Thread Mike Mintz
I was able to get generic types to work when I used GenericTypeInfo and
made sure to wrap the generic in some concrete type. In my case I used
scala.Some as the wrapper. It looks something like this (in Scala):

import org.apache.flink.api.java.typeutils.GenericTypeInfo
val descriptor = new ListStateDescriptor[Some[T]]("blah", new
GenericTypeInfo(classOf[Some[T]]))

Since the descriptor is Some[T] instead of T, I had to wrap and unwrap it
every time I used it.

On Sat, Mar 28, 2020 at 6:02 AM Laurent Exsteens <
laurent.exste...@euranova.eu> wrote:

> Hello,
>
> Using Flink 1.8.1, I'm getting the following error:
>  *The TypeHint is using a generic variable.This is not supported,
> generic types must be fully specified for the TypeHint.*
> when trying to create a ListStateDescriptor with a generic type (full
> sample code in attachment):
>
> public class AND extends RichCoFlatMapFunction Tuple2> {
>
> private transient ListState leftState;
> private transient ListState rightState;
>
> @Override
> public void open(Configuration config) {
> ListStateDescriptor left_descriptor =
> new ListStateDescriptor<>(
> "and_left",
> TypeInformation.of(new TypeHint() {
> }));
> leftState = getRuntimeContext().getListState(left_descriptor);
>
> This gives me the following stack trace:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed. at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
> at eu.euranova.leadcep.Main.main(Main.java:61)Caused by:
> org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a
> generic variable.This is not supported, generic types must be fully
> specified for the TypeHint. at
> org.apache.flink.api.common.typeinfo.TypeHint.(TypeHint.java:54) at
> eu.euranova.leadcep.AND$1.(AND.java:22) at
> eu.euranova.leadcep.AND.open(AND.java:19) at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at
> java.lang.Thread.run(Thread.java:745)*
>
>
> Googling the error didn't give a working solution.
>
> Is there a way to work with state using Generic types? if yes, how?
>
> Thanks in advance for your help!
>
> Best Regards,
>
> Laurent.
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu *
>
> *research.euranova.eu* 
>
> ♻ Be green, keep it on the screen


Re: End to End Latency Tracking in flink

2020-03-29 Thread Lu Niu
Thanks for reply, @Zhijiang, @Congxian!

@Congxian
$current_processing - $event_time works for event time. How about
processing time? Is there a good way to measure the latency?

Best
Lu

On Sun, Mar 29, 2020 at 6:21 AM Zhijiang  wrote:

> Hi Lu,
>
> Besides Congxian's replies, you can also get some further explanations
> from "
> https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking
> ".
>
> Best,
> Zhijiang
>
> --
> From:Congxian Qiu 
> Send Time:2020 Mar. 28 (Sat.) 11:49
> To:Lu Niu 
> Cc:user 
> Subject:Re: End to End Latency Tracking in flink
>
> Hi
> As far as I know, the latency-tracking feature is for debugging usages,
> you can use it to debug, and disable it when running the job on production.
> From my side, use $current_processing - $event_time is something ok, but
> keep the things in mind: the event time may not be the time ingested in
> Flink.
>
> Best,
> Congxian
>
>
> Lu Niu  于2020年3月28日周六 上午6:25写道:
> Hi,
>
> I am looking for end to end latency monitoring of link job. Based on my
> study, I have two options:
>
> 1. flink provide a latency tracking feature. However, the documentation
> says it cannot show actual latency of business logic as it will bypass all
> operators.
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking
>  Also,
> the feature can significantly impact the performance so I assume it's not
> for usage in production. What are users use the latency tracking for?
> Sounds like only back pressure could affect the latency.
>
> 2. I found another stackoverflow question on this.
> https://stackoverflow.com/questions/56578919/latency-monitoring-in-flink-application
>  .
> The answer suggestion to expose (current processing - the event time)
> after source and before sink for end to end latency monitoring. Is this a
> good solution? If not, What’s the official solution for end to end latency
> tracking?
>
> Thank you!
>
> Best
> Lu
>
>
>


Re: [Third-party Tool] Flink memory calculator

2020-03-29 Thread Yangze Guo
Thanks for your feedbacks, @Xintong and @Jeff.

@Jeff
I think it would always be good to leverage exist logic in Flink, such
as JobListener. However, this calculator does not only target to check
the conflict, it also targets to provide the calculating result to
user before the job is actually deployed in case there is any
unexpected configuration. It's a good point that we need to parse the
dynamic configs. I prefer to parse the dynamic configs and cli
commands in bash instead of adding hook in JobListener.

Best,
Yangze Guo

On Mon, Mar 30, 2020 at 10:32 AM Jeff Zhang  wrote:
>
> Hi Yangze,
>
> Does this tool just parse the configuration in flink-conf.yaml ?  Maybe it 
> could be done in JobListener [1] (we should enhance it via adding hook before 
> job submission), so that it could all the cases (e.g. parameters coming from 
> command line)
>
> [1] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35
>
>
> Yangze Guo  于2020年3月30日周一 上午9:40写道:
>>
>> Hi, Yun,
>>
>> I'm sorry that it currently could not handle it. But I think it is a
>> really good idea and that feature would be added to the next version.
>>
>> Best,
>> Yangze Guo
>>
>> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
>> >
>> > Very interesting and convenient tool, just a quick question: could this 
>> > tool also handle deployment cluster commands like "-tm" mixed with 
>> > configuration in `flink-conf.yaml` ?
>> >
>> > Best
>> > Yun Tang
>> > 
>> > From: Yangze Guo 
>> > Sent: Friday, March 27, 2020 18:00
>> > To: user ; user...@flink.apache.org 
>> > 
>> > Subject: [Third-party Tool] Flink memory calculator
>> >
>> > Hi, there.
>> >
>> > In release-1.10, the memory setup of task managers has changed a lot.
>> > I would like to provide here a third-party tool to simulate and get
>> > the calculation result of Flink's memory configuration.
>> >
>> >  Although there is already a detailed setup guide[1] and migration
>> > guide[2] officially, the calculator could further allow users to:
>> > - Verify if there is any conflict in their configuration. The
>> > calculator is more lightweight than starting a Flink cluster,
>> > especially when running Flink on Yarn/Kubernetes. User could make sure
>> > their configuration is correct locally before deploying it to external
>> > resource managers.
>> > - Get all of the memory configurations before deploying. User may set
>> > taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
>> > But they also want to know the total memory consumption of Flink. With
>> > this tool, users could get all of the memory configurations they are
>> > interested in. If anything is unexpected, they would not need to
>> > re-deploy a Flink cluster.
>> >
>> > The repo link of this tool is
>> > https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
>> > BashJavaUtils.jar of Flink and ensures the calculation result is
>> > exactly the same as your Flink dist. For more details, please take a
>> > look at the README.
>> >
>> > Any feedback or suggestion is welcomed!
>> >
>> > [1] 
>> > https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
>> > [2] 
>> > https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html
>> >
>> > Best,
>> > Yangze Guo
>
>
>
> --
> Best Regards
>
> Jeff Zhang


Re: [Third-party Tool] Flink memory calculator

2020-03-29 Thread Xintong Song
Hi Jeff,

I think the purpose of this tool it to allow users play with the memory
configurations without needing to actually deploy the Flink cluster or even
have a job. For sanity checks, we currently have them in the start-up
scripts (for standalone clusters) and resource managers (on K8s/Yarn/Mesos).

I think it makes sense do the checks earlier, i.e. on the client side. But
I'm not sure if JobListener is the right place. IIUC, JobListener is
invoked before submitting a specific job, while the mentioned checks
validate Flink's cluster level configurations. It might be okay for a job
cluster, but does not cover the scenarios of session clusters.

Thank you~

Xintong Song



On Mon, Mar 30, 2020 at 12:03 PM Yangze Guo  wrote:

> Thanks for your feedbacks, @Xintong and @Jeff.
>
> @Jeff
> I think it would always be good to leverage exist logic in Flink, such
> as JobListener. However, this calculator does not only target to check
> the conflict, it also targets to provide the calculating result to
> user before the job is actually deployed in case there is any
> unexpected configuration. It's a good point that we need to parse the
> dynamic configs. I prefer to parse the dynamic configs and cli
> commands in bash instead of adding hook in JobListener.
>
> Best,
> Yangze Guo
>
> On Mon, Mar 30, 2020 at 10:32 AM Jeff Zhang  wrote:
> >
> > Hi Yangze,
> >
> > Does this tool just parse the configuration in flink-conf.yaml ?  Maybe
> it could be done in JobListener [1] (we should enhance it via adding hook
> before job submission), so that it could all the cases (e.g. parameters
> coming from command line)
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35
> >
> >
> > Yangze Guo  于2020年3月30日周一 上午9:40写道:
> >>
> >> Hi, Yun,
> >>
> >> I'm sorry that it currently could not handle it. But I think it is a
> >> really good idea and that feature would be added to the next version.
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
> >> >
> >> > Very interesting and convenient tool, just a quick question: could
> this tool also handle deployment cluster commands like "-tm" mixed with
> configuration in `flink-conf.yaml` ?
> >> >
> >> > Best
> >> > Yun Tang
> >> > 
> >> > From: Yangze Guo 
> >> > Sent: Friday, March 27, 2020 18:00
> >> > To: user ; user...@flink.apache.org <
> user...@flink.apache.org>
> >> > Subject: [Third-party Tool] Flink memory calculator
> >> >
> >> > Hi, there.
> >> >
> >> > In release-1.10, the memory setup of task managers has changed a lot.
> >> > I would like to provide here a third-party tool to simulate and get
> >> > the calculation result of Flink's memory configuration.
> >> >
> >> >  Although there is already a detailed setup guide[1] and migration
> >> > guide[2] officially, the calculator could further allow users to:
> >> > - Verify if there is any conflict in their configuration. The
> >> > calculator is more lightweight than starting a Flink cluster,
> >> > especially when running Flink on Yarn/Kubernetes. User could make sure
> >> > their configuration is correct locally before deploying it to external
> >> > resource managers.
> >> > - Get all of the memory configurations before deploying. User may set
> >> > taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
> >> > But they also want to know the total memory consumption of Flink. With
> >> > this tool, users could get all of the memory configurations they are
> >> > interested in. If anything is unexpected, they would not need to
> >> > re-deploy a Flink cluster.
> >> >
> >> > The repo link of this tool is
> >> > https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
> >> > BashJavaUtils.jar of Flink and ensures the calculation result is
> >> > exactly the same as your Flink dist. For more details, please take a
> >> > look at the README.
> >> >
> >> > Any feedback or suggestion is welcomed!
> >> >
> >> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
> >> > [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html
> >> >
> >> > Best,
> >> > Yangze Guo
> >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>