RE: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-23 Thread Partha Mishra
Hi,

None of the operator is renamed or removed. Testing is carried out with exactly 
same binary used with 1.9 and 1.11.2. Checkpoint saved in 1.9 is not being able 
to retrieve in 1.11.2


From: Sivaprasanna 
Sent: Friday, October 23, 2020 10:57 AM
To: Partha Mishra 
Cc: user@flink.apache.org
Subject: Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

Hi,

Have you dropped or renamed any operator from the original job? If yes, and you 
are okay with discarding the state of that operator, you can submit the job 
with --allowNonRestoredState or -n. 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

-
Sivaprasanna

On Fri, Oct 23, 2020 at 10:48 AM Partha Mishra 
mailto:partha.mis...@man-es.com>> wrote:
Hi,

We are trying to save checkpoints for one of the flink job running in Flink 
version 1.9 and tried to resume the same flink job in Flink version 1.11.2. We 
are getting the below error when trying to restore the saved checkpoint in the 
newer flink version. Can

Cannot map checkpoint/savepoint state for operator 
fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is 
not available in the new program.


Complete Stack Trace :
{​"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: 
Could not execute application.\n\tat 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.\n\tat 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not 
execute application.\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
 7 more\nCaused by: org.apache.flink.client.program.ProgramInvocationException: 
The main method caused an error: Failed to execute job 
'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute 
job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat
 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat
 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat
 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat
 com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat 
com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat 
sun.reflect.NativeMethodAcces

Re: Flink 1.8.3 GC issues

2020-10-23 Thread Piotr Nowojski
Hi Josson,

Thanks for great investigation and coming back to use. Aljoscha, could you
help us here? It looks like you were involved in this original BEAM-3087
issue.

Best,
Piotrek

pt., 23 paź 2020 o 07:36 Josson Paul  napisał(a):

> @Piotr Nowojski   @Nico Kruber 
>
> An update.
>
> I am able to figure out the problem code. A change in the Apache Beam code
> is causing this problem.
>
>
>
>
>
> Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
> the Flink’s check point lock. Now the same lock is used by Flink’s timer
> service to emit the Watermarks. Flink’s timer service is starved to get
> hold of the lock and for some reason it never gets that lock. Aftereffect
>  of this situation is that the ‘WaterMark’ is never emitted by Flink’s
> timer service.  Because there is no Watermarks flowing through the system,
> Sliding Windows are never closed. Data gets accumulated in the Window.
>
>
>
> This problem occurs only if we have external lookup calls (like Redis)
> happen before the data goes to Sliding Window. Something like below.
>
>
>
> KafkaSource à Transforms (Occasional Redis
> lookup)->SlidingWindow->Transforms->Kafka Sink
>
>
>
>
>
>
> https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256
> . This is Beam 2.4 and you can see that there is no synchronized block at
> line 257 and 270.
>
>
>
>
> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264
> . This is Beam 2.15. See the synchronized block introduced in line 264 and
> 280. We are using Beam 2.15 and Flink 1.8.
>
>
>
> Beam introduced this synchronized block because of this bug.
> https://issues.apache.org/jira/browse/BEAM-3087
>
>
>
> After I removed that synchronized keyword everything started working fine
> in my application.
>
>
>
> What do you guys think about this?. Why does Beam need a Synchronized
> block there?
>
>
>
> Beam is using this lock ->
>
>
> https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282
>
>
>
> Thanks,
>
> Josson
>
> On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski 
> wrote:
>
>> Hi Josson,
>>
>> The TM logs that you attached are only from a 5 minutes time period. Are
>> you sure they are encompassing the period before the potential failure and
>> after the potential failure? It would be also nice if you would provide the
>> logs matching to the charts (like the one you were providing in the
>> previous messages), to correlate events (spike in latency/GC with some
>> timestamp from the logs).
>>
>> I was not asking necessarily to upgrade to Java9, but an updated/bug
>> fixed version of Java8 [1].
>>
>> > 1) In Flink 1.4 set up, the data in the Heap is throttled. It never
>> goes out of memory whatever be the ingestion rate. our Windows are 5
>> minutes windows.
>> > 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
>> Full GC doesn't reclaim space.
>>
>> In both cases there is the same mechanism for the backpressure. If a
>> task's output runs out of buffers to put produced records, it will block
>> the task. It can be that between 1.4 and 1.8, with credit based flow
>> control changes, the amount of available buffers for the tasks on your
>> setup has grown, so the tasks are backpressuring later. This in turn can
>> sometimes mean that at any point of time there is more data buffered on the
>> operator's state, like `WindowOperator`. I'm not sure what's the
>> best/easiest way how to check this:
>>
>> 1. the amount of buffered data might be visible via metrics [2][3]
>> 2. if you enable DEBUG logs, it should be visible via:
>>
>> > LOG.debug("Using a local buffer pool with {}-{} buffers",
>> numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
>>
>> entry logged by
>> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`.
>>
>> Piotrek
>>
>> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network
>>
>> pon., 14 wrz 2020 o 05:04 Josson Paul  napisał(a):
>>
>>> @Piotr Nowojski  @Nico Kruber 
>>> I have attached the  Taskmanager/GC/thread dumps in a zip file.
>>>
>>> I don't see any issues in the TM logs.
>>> Tried to upgrade to Java 9. Flink is on top of another platform which
>>> threw errors while upgrading to Java 9. I can't do much for now. We will
>>> upgrade to Jdk 11 in another 2 months.
>>>
>>> Regarding the Heap size. The new experiment I di

Logging when building and testing Flink

2020-10-23 Thread Juha Mynttinen
Hey there,

I noticed that when building and testing Flink itself, logging seems to be
non-existing or very quiet.

I had a look at the logging conf files (such
as flink-tests/src/test/resources/log4j2-test.properties) and the pattern
seems to be that the logging is turned off in tests. At least it looks like
that to me in the conf files and I couldn't find any logs.

I understand that excessive logging can cause flooding of build logs, when
logging to stderr and this can be a problem in e.g. the CI environment. Is
there some other rationale why logging is disabled?

It'd be super nice if one could somehow globally enable test logging and
log to e.g. rolling log files. Is there such a mechanism?

Missing logs can be a problem with testing locally. If you hit such an
issue when running the tests locally that the logs would help to fix, there
are no logs and you need to enable logging and re-run the tests. This is
not great. And if the bug is a "Heisenbug" you might not find the bug on
the re-run.

Regards,
Juha


Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-23 Thread Till Rohrmann
Hi Longdexin,

thanks for reaching out to the Flink community. I am pulling in Jark who
might be able to help you with this question.

Cheers,
Till

On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:

> From my point of view, the value of NOW() function in SQL is certain by the
> time when the streaming app is launched and will not change with the
> process
> time. However, as a new Flink user, I'm not so sure of that. By the way, if
> my attemp is to keep the time logic to update all the time, what should I
> do?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Dependency vulnerabilities with flink 1.11.1 version

2020-10-23 Thread Till Rohrmann
Hi Suchithra,

thanks for doing this analysis. I think we should try to upgrade the
affected libraries. I have opened issues to do these changes [1, 2, 3, 4,
5]. In the future, it would be great if you could first reach out to
priv...@flink.apache.org so that we can fix these problems without drawing
attention to them.

[1] https://issues.apache.org/jira/browse/FLINK-19781
[2] https://issues.apache.org/jira/browse/FLINK-19782
[3] https://issues.apache.org/jira/browse/FLINK-19783
[4] https://issues.apache.org/jira/browse/FLINK-19784
[5] https://issues.apache.org/jira/browse/FLINK-19785

Cheers,
Till

On Thu, Oct 22, 2020 at 12:56 PM V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

>
>
> Hello,
>
>
>
> We are using Apache Flink 1.11.1 version. During our security scans
> following issues are reported by our scan tool.
>
>
>
> *1.Package : commons_codec-1.10*
>
> *Severity: Medium*
>
>
>
> *Description: *
>
> Apache Commons contains a flaw that is due to the Base32 codec decoding
> invalid strings instead of rejecting them. This may allow a remote attacker
> to tunnel additional information via a base 32 string that seems valid.
>
>
>
> *Path:*
>
> /opt/flink/lib/flink-table_2.11-1.11.1.jar:commons-codec
>
> /opt/flink/lib/flink-table-blink_2.11-1.11.1.jar:commons-codec
>
>
>
> *References:*
>
> https://issues.apache.org/jira/browse/CODEC-134
>
> https://issues.apache.org/jira/browse/HTTPCLIENT-2018
>
>
>
> *2. Package : antlr-4.7*
>
> *Severity: Medium*
>
>
>
> *Description: *
>
> ANTLR contains a flaw in
> runtime/Java/src/org/antlr/v4/runtime/atn/ParserATNSimulator.java that is
> triggered as it does not catch exceptions when attempting to access the
> TURN_OFF_LR_LOOP_ENTRY_BRANCH_OPT environment variable. This may allow a
> context-dependent attacker to potentially crash a process linked against
> the library.
>
>
>
> *Path:*
>
> /opt/flink/opt/flink-python_2.11-1.11.1.jar:antlr4-runtime
>
> *References:*
>
> https://github.com/antlr/antlr4/issues/2069
>
>
>
> *3. Package : mesos-1.0.1*
>
> *Severity: Medium*
>
>
>
> *Description: *
>
> Apache Mesos can be configured to require authentication to call the
> Executor HTTP API using JSON Web Token (JWT). In Apache Mesos versions
> pre-1.4.2, 1.5.0, 1.5.1, 1.6.0 the comparison of the generated HMAC value
> against the provided signature in the JWT implementation used is vulnerable
> to a timing attack because instead of a constant-time string comparison
> routine a standard `==` operator has been used. A malicious actor can
> therefore abuse the timing difference of when the JWT validation function
> returns to reveal the correct HMAC value.
>
> *Path:*
>
> /opt/flink/lib/flink-dist_2.11-1.11.1.jar:mesos
>
>
>
> *References:*
>
> https://nvd.nist.gov/vuln/detail/CVE-2018-8023
>
>
>
> *4. Package : okhttp-3.7.0*
>
> *Severity: Medium*
>
>
>
> *Description: *
>
> ** DISPUTED ** CertificatePinner.java in OkHttp 3.x through 3.12.0 allows
> man-in-the-middle attackers to bypass certificate pinning by changing
> SSLContext and the boolean values while hooking the application. NOTE: This
> id is disputed because some parties don't consider this is a vulnerability.
> Their rationale can be found in
> https://github.com/square/okhttp/issues/4967.
>
> *Path:*
>
> /opt/flink/plugins/metrics-datadog/flink-metrics-datadog-1.11.1.jar:okhttp
>
> *References:*
>
> https://nvd.nist.gov/vuln/detail/CVE-2018-20200
>
>
>
> *5. Package : commons_io-2.4*
>
> *Severity: Medium*
>
>
>
> *Description: *
>
> Apache Commons IO contains a flaw that allows traversing outside of a
> restricted path. The issue is due to FileNameUtils.normalize not properly
> sanitizing user input, specifically path traversal style attacks (e.g.
> '../'). With a specially crafted request, a remote attacker can disclose
> arbitrary files.
>
> *Path:*
>
> /opt/flink/lib/flink-dist_2.11-1.11.1.jar:commons-io
>
> /opt/flink/lib/flink-table-blink_2.11-1.11.1.jar:commons-io
>
>
>
> *References:*
>
> https://issues.apache.org/jira/browse/IO-556
>
>
>
> Please let us know your comments on these issues and fix plans.
>
>
>
> Regards,
>
> Suchithra
>


Re: Flink Table SQL and MongoDB connector?

2020-10-23 Thread Till Rohrmann
Hi Dan,

afaik Flink does not have a dedicated MongoDB connector (except for the
DataSet API which is rather old). Hence I believe that the 2nd option seems
to be more promising.

Cheers,
Till

On Thu, Oct 22, 2020 at 6:45 AM Dan Hill  wrote:

> Has anyone connected these two?
>
> Looking through previous emails and the Flink docs, I've see two mentions
> of how to hook up MongoDB to Flink.
>
> 1) https://github.com/okkam-it/flink-mongodb-test
> 2) Debezium->Kafka->Flink
> https://debezium.io/documentation/reference/1.3/connectors/mongodb.html
>
> The debezium route has more setup and maintenance work but scales better
> and probably has an easier hook for Table SQL (since it's Kafka).
>


Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-23 Thread Timo Walther

Hi Manas,

that is a good point. Feel free to open an issue for this. It is not the 
first time that your question appeared on the mailing list.


Regards,
Timo

On 23.10.20 07:22, Manas Kale wrote:

Hi Timo,
I figured it out, thanks a lot for your help.
Are there any articles detailing the pre-flight and cluster phases? I 
couldn't find anything on ci.apache.org/projects/flink 
 and I think this behaviour should 
be documented as a warning/note.



On Thu, Oct 22, 2020 at 6:44 PM Timo Walther > wrote:


Hi Manas,

you can use static variable but you need to make sure that the logic to
fill the static variable is accessible and executed in all JVMs.

I assume `pipeline.properties` is in your JAR that you submit to the
cluster right? Then you should be able to access it through a singleton
pattern instead of a static variable access.

Regards,
Timo


On 22.10.20 14:17, Manas Kale wrote:
 > Sorry, I messed up the code snippet in the earlier mail. The
correct one
 > is :
 >
 > public static void main(String[] args) {
 >         Properties prop =new Properties();
 >
 > InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
 > prop.load(is);
 >
 > HashMap strMap =new HashMap<>();
 >
 > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
 >
 > new Config(strMap);
 >
 > ...
 >
 > }
 >
 > public class Config {
 >
 > public static StringCONFIG_TOPIC;
 >
 > publicConfig(HashMap s) {
 >
 >      CONFIG_TOPIC = s.get("CONFIG_TOPIC");
 >
 > }
 >
 > }
 >
 > The value of CONFIG_TOPIC in a minicluster is properly loaded but
null
 > when run on a cluster.
 >
 >
 > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale mailto:manaskal...@gmail.com>
 > >> wrote:
 >
 >     Hi Timo,
 >     Thank you for the explanation, I can start to see why I was
getting
 >     an exception.
 >     Are you saying that I cannot use static variables at all when
trying
 >     to deploy to a cluster? I would like the variables to remain
static
 >     and not be instance-bound as they are accessed from multiple
classes.
 >     Based on my understanding of what you said, I implemented the
 >     following pattern:
 >
 >     public static void main(String[] args) {
 >             Properties prop =new Properties();
 >
 >     InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
 >     prop.load(is);
 >
 >     strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
 >
 >     new Config(strMap, longMap);
 >
 >     ...
 >
 >     }
 >
 >     public class Config {
 >
 >     public static StringCONFIG_TOPIC;
 >     public static StringCONFIG_KAFKA;
 >
 >     public Config(HashMap s) {
 >          CONFIG_TOPIC = s.get("CONFIG_TOPIC");
 >          CONFIG_KAFKA = s.get("CONFIG_KAFKA");
 >
 >     }
 >
 >     }
 >
 >     This produces the same issue. With the easier solution that you
 >     listed, are you implying I use multiple instances or a singleton
 >     pattern of some sort?
 >
 >     On Thu, Oct 22, 2020 at 1:23 PM Timo Walther
mailto:twal...@apache.org>
 >     >> wrote:
 >
 >         Hi Manas,
 >
 >         you need to make sure to differentiate between what Flink
calls
 >         "pre-flight phase" and "cluster phase".
 >
 >         The pre-flight phase is were the pipeline is constructed
and all
 >         functions are instantiated. They are then later
serialized and
 >         send to
 >         the cluster.
 >
 >         If you are reading your properties file in the `main()`
method
 >         and store
 >         something in static variables, the content is available
locally
 >         where
 >         the pipeline is constructed (e.g. in the client) but when the
 >         function
 >         instances are send to the cluster. Those static variables
are fresh
 >         (thus empty) in the cluster JVMs. You need to either make
sure
 >         that the
 >         properties file is read from each task manager again, or
easier:
 >         pass
 >         the parameters as constructor parameters into the
instances such
 >         that
 >         they are shipped together with the function itself.
 >
 >         I hope this helps.
 >
 >         Regards,
 >         Timo
 >
 >
 >         On 22.10.20 09:24, Manas Kale wrote:
 >          > Hi,

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-10-23 Thread Till Rohrmann
Hi Eleanore,

how much memory did you assign to the JM pod? Maybe the limit is so high
that it takes a bit of time until GC is triggered. Have you tried whether
the same problem also occurs with newer Flink versions?

The difference between checkpoints enabled and disabled is that the JM
needs to do a bit more bookkeeping in order to track the completed
checkpoints. If you are using the HeapStateBackend, then all states smaller
than state.backend.fs.memory-threshold will get inlined, meaning that they
are sent to the JM and stored in the checkpoint meta file. This can
increase the memory usage of the JM process. Depending on
state.checkpoints.num-retained this can grow as large as number retained
checkpoints times the checkpoint size. However, I doubt that this adds up
to several GB of additional space.

In order to better understand the problem, the debug logs of your JM could
be helpful. Also a heap dump might be able to point us towards the
component which is eating up so much memory.

Cheers,
Till

On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin  wrote:

> Hi all,
>
> I have a flink job running version 1.10.2, it simply read from a kafka
> topic with 96 partitions, and output to another kafka topic.
>
> It is running in k8s, with 1 JM (not in HA mode), 12 task managers each
> has 4 slots.
> The checkpoint persists the snapshot to azure blob storage, checkpoints
> interval every 3 seconds, with 10 seconds timeout and minimum pause of 1
> second.
>
> I observed that the job manager pod memory usage grows over time, any
> hints on why this is the case? And the memory usage for JM is significantly
> more compared to no checkpoint enabled.
> [image: image.png]
>
> Thanks a lot!
> Eleanore
>


Re: Job Restart Failure

2020-10-23 Thread Till Rohrmann
Hi Navneeth,

sorry for the late reply. To me it looks as
if 
/mnt/checkpoints/150dee2a70cecdd41b63a06b42a95649/chk-52/76363f89-d19f-44aa-aaf9-b33d89ec7c6c
has not been mounted to the EC2 machine you are using to run the job. Could
you try to log in onto the machine when the problem occurs and
check whether you can open the checkpointing path? Maybe the EFS
troubleshooting guide might also be of help [1].

[1] https://docs.aws.amazon.com/efs/latest/ug/troubleshooting.html

Cheers,
Till

On Wed, Oct 21, 2020 at 7:46 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> Any feedback on how this can be resolved? This is causing downtime in
> production.
>
> Thanks
>
>
>
> On Tue, Oct 20, 2020 at 4:39 PM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm facing an issue in our flink application. This happens in version
>> 1.4.0 and 1.7.2. We have both versions and we are seeing this problem on
>> both. We are running flink on ECS and checkpointing enabled to EFS. When
>> the pipeline restarts due to some node failure or any other reason, it just
>> keeps restarting until the retry attempts without this same error message.
>> When I checked the EFS volume I do see the file is still available but for
>> some reason flink is unable to recover the job. Any pointers will help.
>> Thanks
>>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
>> state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(14/18) from 
>> any of the 1 provided restore options.
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:245)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
>>  ... 5 more
>> Caused by: java.io.FileNotFoundException: 
>> /mnt/checkpoints/150dee2a70cecdd41b63a06b42a95649/chk-52/76363f89-d19f-44aa-aaf9-b33d89ec7c6c
>>  (No such file or directory)
>>  at java.io.FileInputStream.open0(Native Method)
>>  at java.io.FileInputStream.open(FileInputStream.java:195)
>>  at java.io.FileInputStream.(FileInputStream.java:138)
>>  at 
>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>>  at 
>> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
>>  at 
>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>>  at 
>> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
>>  at 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:286)
>>  at 
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:62)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>  ... 7 more
>>
>>
>> *EFS:*
>>
>>
>> [image: image.png]
>>
>>
>>
>> Thanks
>>
>>


Re: expected behavior when Flink job cluster exhausted all restarts

2020-10-23 Thread Till Rohrmann
Hi Eleanore,

if you want to tolerate JM restarts, then you have to enable HA. W/o HA, a
JM restart is effectively a submission of a new job.

In order to tell you more about the Task submission rejection by the
TaskExecutor, I would need to take a look at the logs of the JM and the
rejecting TaskExecutor. Moreover, which Flink version are you using?

Cheers,
Till

On Wed, Oct 21, 2020 at 6:53 PM Eleanore Jin  wrote:

> Hi experts,
> I am running a flink job cluster, the application jar is packaged together
> with flink in a docker image. The flink job cluster is running in
> kubernetes, the restart strategy is below
>
> restart-strategy: failure-rate
> restart-strategy.failure-rate.max-failures-per-interval: 20
> restart-strategy.failure-rate.failure-rate-interval: 3 min
> restart-strategy.failure-rate.delay: 100 ms
>
> The job manager is not setup in HA mode, so only 1 pod.
>
> What I have observed is the job manager pod has restarted a few times, and
> when it restarts, it will start as a new flink job (hence a new flink job
> id), so it seems it could not restart from the last successful checkpoint,
> highlighted in yellow is what the evidence.
>
> So I wonder in this case, should I set the flink job as a fixed value? (if
> there is a way to set it), or should I set the restart strategy to retry
> infinite? Or something else I should do?
>
> Thanks a lot!
> Eleanore
>
> {"@timestamp":"2020-10-21T09:45:30.571Z","@version":"1","message":"1 tasks
> should be restarted to recover the failed task
> 6c62b269f2830c09ffe62c59c9f52d9c_19.
> ","logger_name":"org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
>
>
> {"@timestamp":"2020-10-21T09:45:30.572Z","@version":"1","message":"Job
> 9ikvi0743v9rkayb1qof (0b4c1ed9cd2cb47ee99ddb173a9beee5) switched from state
> RESTARTING to
> FAILING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2,"stack_trace":"org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException:
> Could not submit task because there is no JobManager associated for the
> job 0b4c1ed9cd2cb47ee99ddb173a9beee5.\n\tat
> org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:475)
>


Re: savepoint failure

2020-10-23 Thread Till Rohrmann
Hi Rado,

it is hard to tell the reason w/o a bit more details. Could you share with
us the complete logs of the problematic run? Also the job you are
running and the types of the state you are storing in RocksDB and use as
events in your job are very important. In the linked SO question, the
problem was a type whose hashcode was not immutable.

Cheers,
Till

On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
radoslav.smilya...@smule.com> wrote:

> Hello all,
>
> I am running a Flink job that performs data enrichment. My job has 7 kafka
> consumers that receive messages for dml statements performed for 7 db
> tables.
>
> Job setup:
>
>- Flink is run in k8s in a similar way as it is described here
>
> 
>.
>- 1 job manager and 2 task managers
>- parallelism is set to 4 and 2 task slots
>- rocksdb as state backend
>- protobuf for serialization
>
> Whenever I try to trigger a savepoint after my state is bootstrapped I get
> the following error for different operators:
>
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>
> Note: key group might vary.
>
> I found this
> 
>  article
> in Stackoverflow which relates to such an exception (btw my job graph looks
> similar to the one described in the article except that my job has more
> joins). I double checked my hashcodes and I think that they are fine.
>
> I tried to reduce the parallelism to 1 with 1 task slot per task manager
> and this configuration seems to work. This leads me to a direction that it
> might be some concurrency issue.
>
> I would like to understand what is causing the savepoint failure. Do you
> have any suggestions what I might be missing?
>
> Thanks in advance!
>
> Best Regards,
> Rado
>


[SURVEY] Remove Mesos support

2020-10-23 Thread Robert Metzger
Hi all,

I wanted to discuss if it makes sense to remove support for Mesos in Flink.
It seems that nobody is actively maintaining that component (except for
necessary refactorings because of interfaces we are changing), and there
are almost no users reporting issues or asking for features.

The Apache Mesos project itself seems very inactive: There has been only
one message on the dev@ list in the last 3 months.

In 2020, I found only 3 users mentioning that they are using Mesos on the
user@ list.

Maybe it makes sense to add a prominent log warning into the Mesos code in
the Flink 1.12 release, that we are planning to remove Mesos support. Users
will then have enough opportunity to raise concerns or discuss with us.

Best,
Robert


Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Konstantin Knauf
Hi Robert,

+1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
would still support it in Flink 1.12- with bug fixes for some time so that
users have time to move on.

It would certainly be very interesting to hear from current Flink on Mesos
users, on how they see the evolution of this part of the ecosystem.

Best,

Konstantin


Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Xintong Song
+1 for adding a warning in 1.12 about planning to remove Mesos support.


With my developer hat on, removing the Mesos support would
definitely reduce the maintaining overhead for the deployment and resource
management related components. On the other hand, the Flink on Mesos users'
voices definitely matter a lot for this community. Either way, it would be
good to draw users attention to this discussion early.


Thank you~

Xintong Song



On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf  wrote:

> Hi Robert,
>
> +1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
> would still support it in Flink 1.12- with bug fixes for some time so that
> users have time to move on.
>
> It would certainly be very interesting to hear from current Flink on Mesos
> users, on how they see the evolution of this part of the ecosystem.
>
> Best,
>
> Konstantin
>


Re: KryoException UnsupportedOperationException when writing Avro GenericRecords to Parquet

2020-10-23 Thread Till Rohrmann
Hi Averell,

it looks as if the org.apache.avro.Schema$Field contains a field which is
an unmodifiable collection. The Kryo serializer will try to deserialize
this field by creating an unmodifiable collection and then trying to add
the elements into it. This will fail.

I would recommend using the AvroSerializer for serializing GenericRecords.
You have to add org.apache.flink:flink-avro as a dependency to your job and
then tell the system that you would like to use
the GenericRecordAvroTypeInfo via

DataStream sourceStream =
env.addSource(new AvroGenericSource())
.returns(new GenericRecordAvroTypeInfo(schema));

You can find more information about it here [1].

[1]
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro

Cheers,
Till

On Wed, Oct 21, 2020 at 1:48 PM Averell  wrote:

> Hi,
>
> I'm trying to convert a stream of JSON string to a stream of Avro
> GenericRecords, and write this to parquet files, but I get the exception.
> This exception came at the line /out.collect(genericRecord)/. If there's no
> sink then there's no error.
> /KryoException: java.lang.UnsupportedOperationException/
>
> My code is as following:
> /val parquetSink: StreamingFileSink[GenericRecord] =
> StreamingFileSink
>   .forBulkFormat(new Path(path),
>   ParquetAvroWriters.forGenericRecord(new
> Schema.Parser().parse(schemaString)))
>   .build()
>
>
> val parquetStream = inputStream.process(new ProcessFunction[String,
> GenericRecord] {
> @transient
> private var schema: Schema = _
> @transient
> private var reader: GenericDatumReader[GenericRecord] = _
>
> override def processElement(value: String,
> ctx: ProcessFunction[String,
> GenericRecord]#Context,
> out: Collector[GenericRecord]):
> Unit
> = {
> if (reader == null) {
> schema = new Schema.Parser().parse(schemaString)
> reader = new GenericDatumReader[GenericRecord](schema)
> }
> try {
> val genericRecord = reader.read(null,
> DecoderFactory.get.jsonDecoder(schema, value))
> out.collect(genericRecord)
> } catch {
> case e: Throwable =>
> LOG.warn(s"Error decoding JSON string: $e\nRaw
> string: `${value.value}`")
> throw e
> }
> }
> })
> parquetStream.addSink(parquetSink)
> /
>
> The schema is a simple one with all fields are string.
> I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this.
> Could you please help?
>
> Thanks and regards,
> Averell
>
>
> 
> /com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>

Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Till Rohrmann
Thanks for starting this survey Robert! I second Konstantin and Xintong in
the sense that our Mesos user's opinions should matter most here. If our
community is no longer using the Mesos integration, then I would be +1 for
removing it in order to decrease the maintenance burden.

Cheers,
Till

On Fri, Oct 23, 2020 at 2:03 PM Xintong Song  wrote:

> +1 for adding a warning in 1.12 about planning to remove Mesos support.
>
>
> With my developer hat on, removing the Mesos support would
> definitely reduce the maintaining overhead for the deployment and resource
> management related components. On the other hand, the Flink on Mesos users'
> voices definitely matter a lot for this community. Either way, it would be
> good to draw users attention to this discussion early.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf 
> wrote:
>
>> Hi Robert,
>>
>> +1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
>> would still support it in Flink 1.12- with bug fixes for some time so that
>> users have time to move on.
>>
>> It would certainly be very interesting to hear from current Flink on Mesos
>> users, on how they see the evolution of this part of the ecosystem.
>>
>> Best,
>>
>> Konstantin
>>
>


Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Kostas Kloudas
+1 for adding a warning about the removal of Mesos support and I would
also propose to state explicitly in the warning the version that we
are planning to actually remove it (e.g. 1.13 or even 1.14 if we feel
it is too aggressive).

This will help as a reminder to users and devs about the upcoming
removal and it will avoid future, potentially endless, discussions.

Cheers,
Kostas

On Fri, Oct 23, 2020 at 2:03 PM Xintong Song  wrote:
>
> +1 for adding a warning in 1.12 about planning to remove Mesos support.
>
>
> With my developer hat on, removing the Mesos support would definitely reduce 
> the maintaining overhead for the deployment and resource management related 
> components. On the other hand, the Flink on Mesos users' voices definitely 
> matter a lot for this community. Either way, it would be good to draw users 
> attention to this discussion early.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf  wrote:
>>
>> Hi Robert,
>>
>> +1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
>> would still support it in Flink 1.12- with bug fixes for some time so that
>> users have time to move on.
>>
>> It would certainly be very interesting to hear from current Flink on Mesos
>> users, on how they see the evolution of this part of the ecosystem.
>>
>> Best,
>>
>> Konstantin


Re: savepoint failure

2020-10-23 Thread Till Rohrmann
Glad to hear that you solved your problem. Afaik Flink should not read the
fields of messages and call hashCode on them.

Cheers,
Till

On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
radoslav.smilya...@smule.com> wrote:

> Hi Till,
>
> I found my problem. It was indeed related to a mutable hashcode.
>
> I was using a protobuf message in the key selector function and one of the
> protobuf fields was enum. I checked the implementation of the hashcode of
> the generated message and it is using the int value field of the protobuf
> message so I assumed that it is ok and it's immutable.
>
> I replaced the key selector function to use Tuple[Long, Int] (since my
> protobuf message has only these two fields where the int parameter stands
> for the enum value field). After changing my code to use the Tuple it
> worked.
>
> I am not sure if Flink somehow reads the protobuf message fields and uses
> the hashcode of the fields directly since the generated protobuf enum
> indeed has a mutable hashcode (Enum.hashcode).
>
> Nevertheless it's ok with the Tuple key.
>
> Thanks for your response!
>
> Best Regards,
> Rado
>
>
> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann 
> wrote:
>
>> Hi Rado,
>>
>> it is hard to tell the reason w/o a bit more details. Could you share
>> with us the complete logs of the problematic run? Also the job you are
>> running and the types of the state you are storing in RocksDB and use as
>> events in your job are very important. In the linked SO question, the
>> problem was a type whose hashcode was not immutable.
>>
>> Cheers,
>> Till
>>
>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>> radoslav.smilya...@smule.com> wrote:
>>
>>> Hello all,
>>>
>>> I am running a Flink job that performs data enrichment. My job has 7
>>> kafka consumers that receive messages for dml statements performed for 7 db
>>> tables.
>>>
>>> Job setup:
>>>
>>>- Flink is run in k8s in a similar way as it is described here
>>>
>>> 
>>>.
>>>- 1 job manager and 2 task managers
>>>- parallelism is set to 4 and 2 task slots
>>>- rocksdb as state backend
>>>- protobuf for serialization
>>>
>>> Whenever I try to trigger a savepoint after my state is bootstrapped I
>>> get the following error for different operators:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
>>> KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>> at
>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>> at
>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>
>>> Note: key group might vary.
>>>
>>> I found this
>>> 
>>>  article
>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>> similar to the one described in the article except that my job has more
>>> joins). I double checked my hashcodes and I think that they are fine.
>>>
>>> I tried to reduce the parallelism to 1 with 1 task slot per task manager
>>> and this configuration seems to work. This leads me to a direction that it
>>> might be some concurrency issue.
>>>
>>> I would like to understand what is causing the savepoint failure. Do you
>>> have any suggestions what I might be missing?
>>>
>>> Thanks in advance!
>>>
>>> Best Regards,
>>> Rado
>>>
>>


A group window expects a time attribute for grouping in a stream environment.

2020-10-23 Thread ??????
I'm learning GroupBy Window Aggregation from 
document https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html


My code is:
https://paste.ubuntu.com/p/GQqR4cqdp6/
pojo is:
https://paste.ubuntu.com/p/CF4yttTGQ4/


I got 
A group window expects a time attribute for grouping in a stream environment.


Could anyone how to fix this?
I don't know the right time of this,thanks

Re: Building Flink on VirtualBox VM failing

2020-10-23 Thread Juha Mynttinen
I'm trying again running the tests, now I have four cores (previously five)
and 12 GB RAM (previously 8 GB). I'm still hit by the OOM killer.

The command I'm running is:

mvn -Dflink.forkCount=1 -Dflink.forkCountTestPackage=1 clean verify

[INFO] BUILD FAILURE
[INFO]

[INFO] Total time: 01:17 h
[INFO] Finished at: 2020-10-23T15:36:50+03:00
[INFO] Final Memory: 180M/614M
[INFO]

[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-surefire-plugin:2.22.1:test
(integration-tests) on project flink-tests: There are test failures.
[ERROR]
[ERROR] Please refer to
/home/juha/git/flink/flink-tests/target/surefire-reports for the individual
test results.
[ERROR] Please refer to dump files (if any exist) [date].dump,
[date]-jvmRun[N].dump and [date].dumpstream.
[ERROR] ExecutionException The forked VM terminated without properly saying
goodbye. VM crash or System.exit called?
[ERROR] Command was /bin/sh -c cd /home/juha/git/flink/flink-tests/target
&& /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xms2048m -Xmx2048m
-Dmvn.forkNumber=1 -XX:+UseG1GC -jar
/home/juha/git/flink/flink-tests/target/surefire/surefirebooter15842756015305201470.jar
/home/juha/git/flink/flink-tests/target/surefire
2020-10-23T14-19-18_685-jvmRun1 surefire394592676817174474tmp
surefire_117413817767116882164827tmp
[ERROR] Error occurred in starting fork, check output in log
[ERROR] Process Exit Code: 137
[ERROR] Crashed tests:
[ERROR]
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase
[ERROR] org.apache.maven.surefire.booter.SurefireBooterForkException:
ExecutionException The forked VM terminated without properly saying
goodbye. VM crash or System.exit called?
[ERROR] Command was /bin/sh -c cd /home/juha/git/flink/flink-tests/target
&& /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xms2048m -Xmx2048m
-Dmvn.forkNumber=1 -XX:+UseG1GC -jar
/home/juha/git/flink/flink-tests/target/surefire/surefirebooter15842756015305201470.jar
/home/juha/git/flink/flink-tests/target/surefire
2020-10-23T14-19-18_685-jvmRun1 surefire394592676817174474tmp
surefire_117413817767116882164827tmp
[ERROR] Error occurred in starting fork, check output in log
[ERROR] Process Exit Code: 137
[ERROR] Crashed tests:
[ERROR]
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase
[ERROR] at
org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:510)
[ERROR] at
org.apache.maven.plugin.surefire.booterclient.ForkStarter.runSuitesForkPerTestSet(ForkStarter.java:457)
[ERROR] at
org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:298)
[ERROR] at
org.apache.maven.plugin.surefire.booterclient.ForkStarter.run(ForkStarter.java:246)
[ERROR] at
org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeProvider(AbstractSurefireMojo.java:1183)
[ERROR] at
org.apache.maven.plugin.surefire.AbstractSurefireMojo.executeAfterPreconditionsChecked(AbstractSurefireMojo.java:1011)
[ERROR] at
org.apache.maven.plugin.surefire.AbstractSurefireMojo.execute(AbstractSurefireMojo.java:857)
[ERROR] at
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
[ERROR] at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
[ERROR] at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
[ERROR] at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
[ERROR] at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
[ERROR] at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
[ERROR] at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
[ERROR] at
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
[ERROR] at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355)
[ERROR] at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155)
[ERROR] at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)
[ERROR] at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216)
[ERROR] at org.apache.maven.cli.MavenCli.main(MavenCli.java:160)
[ERROR] at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
[ERROR] at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[ERROR] at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[ERROR] at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[ERROR] at
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
[ERROR] at
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
[ERROR] at
org.codehaus.plexus.classworlds.launcher.Launche

how to register TableAggregateFunction?

2020-10-23 Thread ??????
I'm learning document


part Flat Aggregate




My code is:


https://paste.ubuntu.com/p/HmB4q2WJSb/


Could you tell me how to register TableAggregateFunction


Thanks for your help

Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Piyush Narang
Hi folks,

We at Criteo are active users of the Flink on Mesos resource management 
component. We are pretty heavy users of Mesos for scheduling workloads on our 
edge datacenters and we do want to continue to be able to run some of our Flink 
topologies (to compute machine learning short term features) on those DCs. If 
possible our vote would be not to drop Mesos support as that will tie us to an 
old release / have to maintain a fork as we’re not planning to migrate off 
Mesos anytime soon. Is the burden something that can be helped with by the 
community? (Or are you referring to having to ensure PRs handle the Mesos piece 
as well when they touch the resource managers?)

Thanks,

-- Piyush


From: Till Rohrmann 
Date: Friday, October 23, 2020 at 8:19 AM
To: Xintong Song 
Cc: dev , user 
Subject: Re: [SURVEY] Remove Mesos support

Thanks for starting this survey Robert! I second Konstantin and Xintong in the 
sense that our Mesos user's opinions should matter most here. If our community 
is no longer using the Mesos integration, then I would be +1 for removing it in 
order to decrease the maintenance burden.

Cheers,
Till

On Fri, Oct 23, 2020 at 2:03 PM Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
+1 for adding a warning in 1.12 about planning to remove Mesos support.



With my developer hat on, removing the Mesos support would definitely reduce 
the maintaining overhead for the deployment and resource management related 
components. On the other hand, the Flink on Mesos users' voices definitely 
matter a lot for this community. Either way, it would be good to draw users 
attention to this discussion early.



Thank you~

Xintong Song


On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf 
mailto:kna...@apache.org>> wrote:
Hi Robert,

+1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
would still support it in Flink 1.12- with bug fixes for some time so that
users have time to move on.

It would certainly be very interesting to hear from current Flink on Mesos
users, on how they see the evolution of this part of the ecosystem.

Best,

Konstantin


Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Kostas Kloudas
Thanks Piyush for the message.
After this, I revoke my +1. I agree with the previous opinions that we
cannot drop code that is actively used by users, especially if it
something that deep in the stack as support for cluster management
framework.

Cheers,
Kostas

On Fri, Oct 23, 2020 at 4:15 PM Piyush Narang  wrote:
>
> Hi folks,
>
>
>
> We at Criteo are active users of the Flink on Mesos resource management 
> component. We are pretty heavy users of Mesos for scheduling workloads on our 
> edge datacenters and we do want to continue to be able to run some of our 
> Flink topologies (to compute machine learning short term features) on those 
> DCs. If possible our vote would be not to drop Mesos support as that will tie 
> us to an old release / have to maintain a fork as we’re not planning to 
> migrate off Mesos anytime soon. Is the burden something that can be helped 
> with by the community? (Or are you referring to having to ensure PRs handle 
> the Mesos piece as well when they touch the resource managers?)
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>
>
> From: Till Rohrmann 
> Date: Friday, October 23, 2020 at 8:19 AM
> To: Xintong Song 
> Cc: dev , user 
> Subject: Re: [SURVEY] Remove Mesos support
>
>
>
> Thanks for starting this survey Robert! I second Konstantin and Xintong in 
> the sense that our Mesos user's opinions should matter most here. If our 
> community is no longer using the Mesos integration, then I would be +1 for 
> removing it in order to decrease the maintenance burden.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 23, 2020 at 2:03 PM Xintong Song  wrote:
>
> +1 for adding a warning in 1.12 about planning to remove Mesos support.
>
>
>
> With my developer hat on, removing the Mesos support would definitely reduce 
> the maintaining overhead for the deployment and resource management related 
> components. On the other hand, the Flink on Mesos users' voices definitely 
> matter a lot for this community. Either way, it would be good to draw users 
> attention to this discussion early.
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf  wrote:
>
> Hi Robert,
>
> +1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
> would still support it in Flink 1.12- with bug fixes for some time so that
> users have time to move on.
>
> It would certainly be very interesting to hear from current Flink on Mesos
> users, on how they see the evolution of this part of the ecosystem.
>
> Best,
>
> Konstantin


Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Piyush Narang
Thanks Kostas. If there's items we can help with, I'm sure we'd be able to find 
folks who would be excited to contribute / help in any way. 

-- Piyush
 

On 10/23/20, 10:25 AM, "Kostas Kloudas"  wrote:

Thanks Piyush for the message.
After this, I revoke my +1. I agree with the previous opinions that we
cannot drop code that is actively used by users, especially if it
something that deep in the stack as support for cluster management
framework.

Cheers,
Kostas

On Fri, Oct 23, 2020 at 4:15 PM Piyush Narang  wrote:
>
> Hi folks,
>
>
>
> We at Criteo are active users of the Flink on Mesos resource management 
component. We are pretty heavy users of Mesos for scheduling workloads on our 
edge datacenters and we do want to continue to be able to run some of our Flink 
topologies (to compute machine learning short term features) on those DCs. If 
possible our vote would be not to drop Mesos support as that will tie us to an 
old release / have to maintain a fork as we’re not planning to migrate off 
Mesos anytime soon. Is the burden something that can be helped with by the 
community? (Or are you referring to having to ensure PRs handle the Mesos piece 
as well when they touch the resource managers?)
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>
>
> From: Till Rohrmann 
> Date: Friday, October 23, 2020 at 8:19 AM
> To: Xintong Song 
> Cc: dev , user 
> Subject: Re: [SURVEY] Remove Mesos support
>
>
>
> Thanks for starting this survey Robert! I second Konstantin and Xintong 
in the sense that our Mesos user's opinions should matter most here. If our 
community is no longer using the Mesos integration, then I would be +1 for 
removing it in order to decrease the maintenance burden.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 23, 2020 at 2:03 PM Xintong Song  
wrote:
>
> +1 for adding a warning in 1.12 about planning to remove Mesos support.
>
>
>
> With my developer hat on, removing the Mesos support would definitely 
reduce the maintaining overhead for the deployment and resource management 
related components. On the other hand, the Flink on Mesos users' voices 
definitely matter a lot for this community. Either way, it would be good to 
draw users attention to this discussion early.
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf  
wrote:
>
> Hi Robert,
>
> +1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
> would still support it in Flink 1.12- with bug fixes for some time so that
> users have time to move on.
>
> It would certainly be very interesting to hear from current Flink on Mesos
> users, on how they see the evolution of this part of the ecosystem.
>
> Best,
>
> Konstantin




Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Robert Metzger
Hey Piyush,
thanks a lot for raising this concern. I believe we should keep Mesos in
Flink then in the foreseeable future.
Your offer to help is much appreciated. We'll let you know once there is
something.

On Fri, Oct 23, 2020 at 4:28 PM Piyush Narang  wrote:

> Thanks Kostas. If there's items we can help with, I'm sure we'd be able to
> find folks who would be excited to contribute / help in any way.
>
> -- Piyush
>
>
> On 10/23/20, 10:25 AM, "Kostas Kloudas"  wrote:
>
> Thanks Piyush for the message.
> After this, I revoke my +1. I agree with the previous opinions that we
> cannot drop code that is actively used by users, especially if it
> something that deep in the stack as support for cluster management
> framework.
>
> Cheers,
> Kostas
>
> On Fri, Oct 23, 2020 at 4:15 PM Piyush Narang 
> wrote:
> >
> > Hi folks,
> >
> >
> >
> > We at Criteo are active users of the Flink on Mesos resource
> management component. We are pretty heavy users of Mesos for scheduling
> workloads on our edge datacenters and we do want to continue to be able to
> run some of our Flink topologies (to compute machine learning short term
> features) on those DCs. If possible our vote would be not to drop Mesos
> support as that will tie us to an old release / have to maintain a fork as
> we’re not planning to migrate off Mesos anytime soon. Is the burden
> something that can be helped with by the community? (Or are you referring
> to having to ensure PRs handle the Mesos piece as well when they touch the
> resource managers?)
> >
> >
> >
> > Thanks,
> >
> >
> >
> > -- Piyush
> >
> >
> >
> >
> >
> > From: Till Rohrmann 
> > Date: Friday, October 23, 2020 at 8:19 AM
> > To: Xintong Song 
> > Cc: dev , user 
> > Subject: Re: [SURVEY] Remove Mesos support
> >
> >
> >
> > Thanks for starting this survey Robert! I second Konstantin and
> Xintong in the sense that our Mesos user's opinions should matter most
> here. If our community is no longer using the Mesos integration, then I
> would be +1 for removing it in order to decrease the maintenance burden.
> >
> >
> >
> > Cheers,
> >
> > Till
> >
> >
> >
> > On Fri, Oct 23, 2020 at 2:03 PM Xintong Song 
> wrote:
> >
> > +1 for adding a warning in 1.12 about planning to remove Mesos
> support.
> >
> >
> >
> > With my developer hat on, removing the Mesos support would
> definitely reduce the maintaining overhead for the deployment and resource
> management related components. On the other hand, the Flink on Mesos users'
> voices definitely matter a lot for this community. Either way, it would be
> good to draw users attention to this discussion early.
> >
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> >
> >
> > On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf 
> wrote:
> >
> > Hi Robert,
> >
> > +1 to the plan you outlined. If we were to drop support in Flink
> 1.13+, we
> > would still support it in Flink 1.12- with bug fixes for some time
> so that
> > users have time to move on.
> >
> > It would certainly be very interesting to hear from current Flink on
> Mesos
> > users, on how they see the evolution of this part of the ecosystem.
> >
> > Best,
> >
> > Konstantin
>
>
>


Running flink in a Local Execution Environment for Production Workloads

2020-10-23 Thread Joseph Lorenzini




Hi all,
 
I plan to run flink jobs as docker containers in a AWS Elastic Container Service. I will have checkpointing enabled where state is stored in a s3 bucket. Each deployment will run in a per-job mode.  Are there
 any non-obvious downsides to running these jobs with a local execution environment so that the deployment turns into deploying a single java application?
 
The obvious downside is that you don’t get any horizontal scalability. That’s a given and I’d have to scale up not out in this mode. I’d like to discover if there are any other negatives with this approach.
 
Thanks,
Joe  

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should
 destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate
 to the official business of my firm shall be understood as neither given nor endorsed by it.




Re: expected behavior when Flink job cluster exhausted all restarts

2020-10-23 Thread Eleanore Jin
Hi Till,

thanks a lot for the explanation. Im using Flink 1.10.2 with java 11.

Thanks!
Eleanore

On Fri, Oct 23, 2020 at 4:31 AM Till Rohrmann  wrote:

> Hi Eleanore,
>
> if you want to tolerate JM restarts, then you have to enable HA. W/o HA, a
> JM restart is effectively a submission of a new job.
>
> In order to tell you more about the Task submission rejection by the
> TaskExecutor, I would need to take a look at the logs of the JM and the
> rejecting TaskExecutor. Moreover, which Flink version are you using?
>
> Cheers,
> Till
>
> On Wed, Oct 21, 2020 at 6:53 PM Eleanore Jin 
> wrote:
>
>> Hi experts,
>> I am running a flink job cluster, the application jar is packaged
>> together with flink in a docker image. The flink job cluster is running in
>> kubernetes, the restart strategy is below
>>
>> restart-strategy: failure-rate
>> restart-strategy.failure-rate.max-failures-per-interval: 20
>> restart-strategy.failure-rate.failure-rate-interval: 3 min
>> restart-strategy.failure-rate.delay: 100 ms
>>
>> The job manager is not setup in HA mode, so only 1 pod.
>>
>> What I have observed is the job manager pod has restarted a few times,
>> and when it restarts, it will start as a new flink job (hence a new flink
>> job id), so it seems it could not restart from the last successful
>> checkpoint, highlighted in yellow is what the evidence.
>>
>> So I wonder in this case, should I set the flink job as a fixed value?
>> (if there is a way to set it), or should I set the restart strategy to
>> retry infinite? Or something else I should do?
>>
>> Thanks a lot!
>> Eleanore
>>
>> {"@timestamp":"2020-10-21T09:45:30.571Z","@version":"1","message":"1
>> tasks should be restarted to recover the failed task
>> 6c62b269f2830c09ffe62c59c9f52d9c_19.
>> ","logger_name":"org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2}
>>
>>
>> {"@timestamp":"2020-10-21T09:45:30.572Z","@version":"1","message":"Job
>> 9ikvi0743v9rkayb1qof (0b4c1ed9cd2cb47ee99ddb173a9beee5) switched from state
>> RESTARTING to
>> FAILING.","logger_name":"org.apache.flink.runtime.executiongraph.ExecutionGraph","thread_name":"flink-akka.actor.default-dispatcher-4","level":"INFO","level_value":2,"stack_trace":"org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException:
>> Could not submit task because there is no JobManager associated for the
>> job 0b4c1ed9cd2cb47ee99ddb173a9beee5.\n\tat
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:475)
>>
>


Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-10-23 Thread Eleanore Jin
Hi Till,
Thanks a lot for the prompt response, please see below information.

1. how much memory assign to JM pod?
6g for container memory limit, 5g for jobmanager.heap.size, I think this is
the only available jm memory configuration for flink 1.10.2

2. Have you tried with newer Flink versions?
I am actually using Apache Beam, so the latest version they support for
Flink is 1.10

3. What statebackend is used?
FsStateBackend, and the checkpoint size is around 12MB from checkpoint
metrics, so I think it is not get inlined

4. What is state.checkpoints.num-retained?
I did not configure this explicitly, so by default only 1 should be retained

5. Anything suspicious from JM log?
There is no Exception nor Error, the only thing I see is the below logs
keeps on repeating

{"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling
threads for Delete operation as thread count 0 is <=
1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":3}

6. JVM args obtained vis jcmd

-Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow


7. Heap info returned by jcmd  GC.heap_info

it suggested only about 1G of the heap is used

garbage-first heap   total 5242880K, used 1123073K [0x0006c000,
0x0008)

  region size 2048K, 117 young (239616K), 15 survivors (30720K)

 Metaspace   used 108072K, capacity 110544K, committed 110720K,
reserved 1146880K

  class spaceused 12963K, capacity 13875K, committed 13952K, reserved
1048576K


8. top -p 

it suggested for flink job manager java process 4.8G of physical memory is
consumed

PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND



1 root  20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java




Thanks a lot!
Eleanore


On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann  wrote:

> Hi Eleanore,
>
> how much memory did you assign to the JM pod? Maybe the limit is so high
> that it takes a bit of time until GC is triggered. Have you tried whether
> the same problem also occurs with newer Flink versions?
>
> The difference between checkpoints enabled and disabled is that the JM
> needs to do a bit more bookkeeping in order to track the completed
> checkpoints. If you are using the HeapStateBackend, then all states smaller
> than state.backend.fs.memory-threshold will get inlined, meaning that they
> are sent to the JM and stored in the checkpoint meta file. This can
> increase the memory usage of the JM process. Depending on
> state.checkpoints.num-retained this can grow as large as number retained
> checkpoints times the checkpoint size. However, I doubt that this adds up
> to several GB of additional space.
>
> In order to better understand the problem, the debug logs of your JM could
> be helpful. Also a heap dump might be able to point us towards the
> component which is eating up so much memory.
>
> Cheers,
> Till
>
> On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin 
> wrote:
>
>> Hi all,
>>
>> I have a flink job running version 1.10.2, it simply read from a kafka
>> topic with 96 partitions, and output to another kafka topic.
>>
>> It is running in k8s, with 1 JM (not in HA mode), 12 task managers each
>> has 4 slots.
>> The checkpoint persists the snapshot to azure blob storage, checkpoints
>> interval every 3 seconds, with 10 seconds timeout and minimum pause of 1
>> second.
>>
>> I observed that the job manager pod memory usage grows over time, any
>> hints on why this is the case? And the memory usage for JM is significantly
>> more compared to no checkpoint enabled.
>> [image: image.png]
>>
>> Thanks a lot!
>> Eleanore
>>
>


Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-10-23 Thread Eleanore Jin
Hi Till,

please see the screenshot of heap dump: https://ibb.co/92Hzrpr

Thanks!
Eleanore

On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin  wrote:

> Hi Till,
> Thanks a lot for the prompt response, please see below information.
>
> 1. how much memory assign to JM pod?
> 6g for container memory limit, 5g for jobmanager.heap.size, I think this
> is the only available jm memory configuration for flink 1.10.2
>
> 2. Have you tried with newer Flink versions?
> I am actually using Apache Beam, so the latest version they support for
> Flink is 1.10
>
> 3. What statebackend is used?
> FsStateBackend, and the checkpoint size is around 12MB from checkpoint
> metrics, so I think it is not get inlined
>
> 4. What is state.checkpoints.num-retained?
> I did not configure this explicitly, so by default only 1 should be
> retained
>
> 5. Anything suspicious from JM log?
> There is no Exception nor Error, the only thing I see is the below logs
> keeps on repeating
>
> {"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling
> threads for Delete operation as thread count 0 is <=
> 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":3}
>
> 6. JVM args obtained vis jcmd
>
> -Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow
>
>
> 7. Heap info returned by jcmd  GC.heap_info
>
> it suggested only about 1G of the heap is used
>
> garbage-first heap   total 5242880K, used 1123073K [0x0006c000,
> 0x0008)
>
>   region size 2048K, 117 young (239616K), 15 survivors (30720K)
>
>  Metaspace   used 108072K, capacity 110544K, committed 110720K,
> reserved 1146880K
>
>   class spaceused 12963K, capacity 13875K, committed 13952K, reserved
> 1048576K
>
>
> 8. top -p 
>
> it suggested for flink job manager java process 4.8G of physical memory is
> consumed
>
> PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND
>
>
>
> 1 root  20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java
>
>
>
>
> Thanks a lot!
> Eleanore
>
>
> On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann 
> wrote:
>
>> Hi Eleanore,
>>
>> how much memory did you assign to the JM pod? Maybe the limit is so high
>> that it takes a bit of time until GC is triggered. Have you tried whether
>> the same problem also occurs with newer Flink versions?
>>
>> The difference between checkpoints enabled and disabled is that the JM
>> needs to do a bit more bookkeeping in order to track the completed
>> checkpoints. If you are using the HeapStateBackend, then all states smaller
>> than state.backend.fs.memory-threshold will get inlined, meaning that they
>> are sent to the JM and stored in the checkpoint meta file. This can
>> increase the memory usage of the JM process. Depending on
>> state.checkpoints.num-retained this can grow as large as number retained
>> checkpoints times the checkpoint size. However, I doubt that this adds up
>> to several GB of additional space.
>>
>> In order to better understand the problem, the debug logs of your JM
>> could be helpful. Also a heap dump might be able to point us towards the
>> component which is eating up so much memory.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a flink job running version 1.10.2, it simply read from a kafka
>>> topic with 96 partitions, and output to another kafka topic.
>>>
>>> It is running in k8s, with 1 JM (not in HA mode), 12 task managers each
>>> has 4 slots.
>>> The checkpoint persists the snapshot to azure blob storage, checkpoints
>>> interval every 3 seconds, with 10 seconds timeout and minimum pause of 1
>>> second.
>>>
>>> I observed that the job manager pod memory usage grows over time, any
>>> hints on why this is the case? And the memory usage for JM is significantly
>>> more compared to no checkpoint enabled.
>>> [image: image.png]
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>


Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Lasse Nedergaard
Hi

At Trackunit We have been using Mesos for long time but have now moved to k8s. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 23. okt. 2020 kl. 17.01 skrev Robert Metzger :
> 
> 
> Hey Piyush,
> thanks a lot for raising this concern. I believe we should keep Mesos in 
> Flink then in the foreseeable future.
> Your offer to help is much appreciated. We'll let you know once there is 
> something.
> 
>> On Fri, Oct 23, 2020 at 4:28 PM Piyush Narang  wrote:
>> Thanks Kostas. If there's items we can help with, I'm sure we'd be able to 
>> find folks who would be excited to contribute / help in any way. 
>> 
>> -- Piyush
>> 
>> 
>> On 10/23/20, 10:25 AM, "Kostas Kloudas"  wrote:
>> 
>> Thanks Piyush for the message.
>> After this, I revoke my +1. I agree with the previous opinions that we
>> cannot drop code that is actively used by users, especially if it
>> something that deep in the stack as support for cluster management
>> framework.
>> 
>> Cheers,
>> Kostas
>> 
>> On Fri, Oct 23, 2020 at 4:15 PM Piyush Narang  
>> wrote:
>> >
>> > Hi folks,
>> >
>> >
>> >
>> > We at Criteo are active users of the Flink on Mesos resource 
>> management component. We are pretty heavy users of Mesos for scheduling 
>> workloads on our edge datacenters and we do want to continue to be able to 
>> run some of our Flink topologies (to compute machine learning short term 
>> features) on those DCs. If possible our vote would be not to drop Mesos 
>> support as that will tie us to an old release / have to maintain a fork as 
>> we’re not planning to migrate off Mesos anytime soon. Is the burden 
>> something that can be helped with by the community? (Or are you referring to 
>> having to ensure PRs handle the Mesos piece as well when they touch the 
>> resource managers?)
>> >
>> >
>> >
>> > Thanks,
>> >
>> >
>> >
>> > -- Piyush
>> >
>> >
>> >
>> >
>> >
>> > From: Till Rohrmann 
>> > Date: Friday, October 23, 2020 at 8:19 AM
>> > To: Xintong Song 
>> > Cc: dev , user 
>> > Subject: Re: [SURVEY] Remove Mesos support
>> >
>> >
>> >
>> > Thanks for starting this survey Robert! I second Konstantin and 
>> Xintong in the sense that our Mesos user's opinions should matter most here. 
>> If our community is no longer using the Mesos integration, then I would be 
>> +1 for removing it in order to decrease the maintenance burden.
>> >
>> >
>> >
>> > Cheers,
>> >
>> > Till
>> >
>> >
>> >
>> > On Fri, Oct 23, 2020 at 2:03 PM Xintong Song  
>> wrote:
>> >
>> > +1 for adding a warning in 1.12 about planning to remove Mesos support.
>> >
>> >
>> >
>> > With my developer hat on, removing the Mesos support would definitely 
>> reduce the maintaining overhead for the deployment and resource management 
>> related components. On the other hand, the Flink on Mesos users' voices 
>> definitely matter a lot for this community. Either way, it would be good to 
>> draw users attention to this discussion early.
>> >
>> >
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf  
>> wrote:
>> >
>> > Hi Robert,
>> >
>> > +1 to the plan you outlined. If we were to drop support in Flink 
>> 1.13+, we
>> > would still support it in Flink 1.12- with bug fixes for some time so 
>> that
>> > users have time to move on.
>> >
>> > It would certainly be very interesting to hear from current Flink on 
>> Mesos
>> > users, on how they see the evolution of this part of the ecosystem.
>> >
>> > Best,
>> >
>> > Konstantin
>> 
>> 


Re: Trying to run Flink tests

2020-10-23 Thread Dan Hill
Changing down to maven 3.2 shows an error.   It seems like I'm hitting
flaky tests.  I hit one error and then a different error when running again.

I'm not blocked now.  My diff was already merged and the related tests
pass.  Neither of these failures look related to my diff.

 <<< FAILURE! - in org.apache.flink.orc.OrcColumnarRowSplitReaderTest

[ERROR]
testReadFileWithTypes(org.apache.flink.orc.OrcColumnarRowSplitReaderTest)  Time
elapsed: 1.833 s  <<< FAILURE!

org.junit.ComparisonFailure: expected:<19[69-12-3]1> but was:<19[70-01-0]1>

at org.junit.Assert.assertEquals(Assert.java:115)

at org.junit.Assert.assertEquals(Assert.java:144)

at
org.apache.flink.orc.OrcColumnarRowSplitReaderTest.testReadFileWithTypes(OrcColumnarRowSplitReaderTest.java:371)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)

at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)

at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)

at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)

at
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)

at org.junit.rules.RunRules.evaluate(RunRules.java:20)

at org.junit.runners.ParentRunner.run(ParentRunner.java:363)

at
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

at
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)

at
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)

at
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)

at
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)

at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)

at
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)

My next run, this test fails.  The test doesn't exist anymore so I'm
guessing it might be an issue with my head.

Expected: a value less than or equal to <160290L>

 but: <336174L> was greater than <160290L>

at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)

at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)

at
org.apache.flink.streaming.runtime.io.AlternatingCheckpointBarrierHandlerTest.assertMetrics(AlternatingCheckpointBarrierHandlerTest.java:212)

at
org.apache.flink.streaming.runtime.io.AlternatingCheckpointBarrierHandlerTest.testMetricsAlternation(AlternatingCheckpointBarrierHandlerTest.java:120)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)

at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)

at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)

at org.junit.runners.BlockJUnit4ClassRu

On Thu, Oct 22, 2020 at 8:49 PM Xintong Song  wrote:

> Hi Dan,
>
> I tried with the PR you pointed out, and cannot reproduce the problem. So
> it should not be related to the PR codes.
>
> I'm running with maven 3.2.5, which is the same version that we use for
> running ci tests on AZP for PRs. Your maven log suggests the maven version
> on your machine is 3.6.3

FLINK 1.11 Graphite Metrics

2020-10-23 Thread Vijayendra Yadav
Hi Team,

for Flink 1.11 Graphite Metrics. I see the following Error in the log.
Any suggestions?

020-10-23 21:55:14,652 ERROR
org.apache.flink.runtime.metrics.ReporterSetup- Could
not instantiate metrics reporter grph. Metrics might not be
exposed/reported.
java.lang.ClassNotFoundException:
org.apache.flink.metrics.graphite.GraphiteReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:313)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:274)
at 
org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:235)
at 
org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:316)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:270)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:208)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)


Regards,
Vijay


Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-10-23 Thread Eleanore Jin
I also tried enable native memory tracking, via jcmd, here is the memory
breakdown: https://ibb.co/ssrZB4F

since job manager memory configuration for flink 1.10.2 only has
jobmanager.heap.size, and it only translates to heap settings, should I
also set -XX:MaxDirectMemorySize and -XX:MaxMetaspaceSize for job manager?
And any recommendations?

Thanks a lot!
Eleanore

On Fri, Oct 23, 2020 at 9:28 AM Eleanore Jin  wrote:

> Hi Till,
>
> please see the screenshot of heap dump: https://ibb.co/92Hzrpr
>
> Thanks!
> Eleanore
>
> On Fri, Oct 23, 2020 at 9:25 AM Eleanore Jin 
> wrote:
>
>> Hi Till,
>> Thanks a lot for the prompt response, please see below information.
>>
>> 1. how much memory assign to JM pod?
>> 6g for container memory limit, 5g for jobmanager.heap.size, I think this
>> is the only available jm memory configuration for flink 1.10.2
>>
>> 2. Have you tried with newer Flink versions?
>> I am actually using Apache Beam, so the latest version they support for
>> Flink is 1.10
>>
>> 3. What statebackend is used?
>> FsStateBackend, and the checkpoint size is around 12MB from checkpoint
>> metrics, so I think it is not get inlined
>>
>> 4. What is state.checkpoints.num-retained?
>> I did not configure this explicitly, so by default only 1 should be
>> retained
>>
>> 5. Anything suspicious from JM log?
>> There is no Exception nor Error, the only thing I see is the below logs
>> keeps on repeating
>>
>> {"@timestamp":"2020-10-23T16:05:20.350Z","@version":"1","message":"Disabling
>> threads for Delete operation as thread count 0 is <=
>> 1","logger_name":"org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor","thread_name":"jobmanager-future-thread-4","level":"WARN","level_value":3}
>>
>> 6. JVM args obtained vis jcmd
>>
>> -Xms5120m -Xmx5120m -XX:MaxGCPauseMillis=20 -XX:-OmitStackTraceInFastThrow
>>
>>
>> 7. Heap info returned by jcmd  GC.heap_info
>>
>> it suggested only about 1G of the heap is used
>>
>> garbage-first heap   total 5242880K, used 1123073K [0x0006c000,
>> 0x0008)
>>
>>   region size 2048K, 117 young (239616K), 15 survivors (30720K)
>>
>>  Metaspace   used 108072K, capacity 110544K, committed 110720K,
>> reserved 1146880K
>>
>>   class spaceused 12963K, capacity 13875K, committed 13952K,
>> reserved 1048576K
>>
>>
>> 8. top -p 
>>
>> it suggested for flink job manager java process 4.8G of physical memory
>> is consumed
>>
>> PID USER  PR  NIVIRTRESSHR S  %CPU %MEM TIME+ COMMAND
>>
>>
>>
>> 1 root  20   0 13.356g 4.802g  22676 S   6.0  7.6  37:48.62 java
>>
>>
>>
>>
>> Thanks a lot!
>> Eleanore
>>
>>
>> On Fri, Oct 23, 2020 at 4:19 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> how much memory did you assign to the JM pod? Maybe the limit is so high
>>> that it takes a bit of time until GC is triggered. Have you tried whether
>>> the same problem also occurs with newer Flink versions?
>>>
>>> The difference between checkpoints enabled and disabled is that the JM
>>> needs to do a bit more bookkeeping in order to track the completed
>>> checkpoints. If you are using the HeapStateBackend, then all states smaller
>>> than state.backend.fs.memory-threshold will get inlined, meaning that they
>>> are sent to the JM and stored in the checkpoint meta file. This can
>>> increase the memory usage of the JM process. Depending on
>>> state.checkpoints.num-retained this can grow as large as number retained
>>> checkpoints times the checkpoint size. However, I doubt that this adds up
>>> to several GB of additional space.
>>>
>>> In order to better understand the problem, the debug logs of your JM
>>> could be helpful. Also a heap dump might be able to point us towards the
>>> component which is eating up so much memory.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Oct 22, 2020 at 4:56 AM Eleanore Jin 
>>> wrote:
>>>
 Hi all,

 I have a flink job running version 1.10.2, it simply read from a kafka
 topic with 96 partitions, and output to another kafka topic.

 It is running in k8s, with 1 JM (not in HA mode), 12 task managers each
 has 4 slots.
 The checkpoint persists the snapshot to azure blob storage, checkpoints
 interval every 3 seconds, with 10 seconds timeout and minimum pause of 1
 second.

 I observed that the job manager pod memory usage grows over time, any
 hints on why this is the case? And the memory usage for JM is significantly
 more compared to no checkpoint enabled.
 [image: image.png]

 Thanks a lot!
 Eleanore

>>>


Re: Trying to run Flink tests

2020-10-23 Thread Xintong Song
Hi Dan,

I think these are unstable test cases. As we are approaching the feature
freeze date for release 1.12.0, people are busy merging new features
recently, which lead to the test instability.

I'm not aware of any issue reported on the `OrcColumnarRowSplitReaderTest`.
>From what you described, this does not always happen on your machine. You
may want to fire an issue on it if you still have the complete maven logs.

For `AlternatingCheckpointBarrierHandlerTest`, there is already an issue
[1] reported on its instability.

As long as you do not run into the VM crash problems, I think it should be
fine.

Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-19665

On Sat, Oct 24, 2020 at 5:56 AM Dan Hill  wrote:

> Changing down to maven 3.2 shows an error.   It seems like I'm hitting
> flaky tests.  I hit one error and then a different error when running again.
>
> I'm not blocked now.  My diff was already merged and the related tests
> pass.  Neither of these failures look related to my diff.
>
>  <<< FAILURE! - in org.apache.flink.orc.OrcColumnarRowSplitReaderTest
>
> [ERROR]
> testReadFileWithTypes(org.apache.flink.orc.OrcColumnarRowSplitReaderTest)
> Time elapsed: 1.833 s  <<< FAILURE!
>
> org.junit.ComparisonFailure: expected:<19[69-12-3]1> but was:<19[70-01-0]1>
>
> at org.junit.Assert.assertEquals(Assert.java:115)
>
> at org.junit.Assert.assertEquals(Assert.java:144)
>
> at
> org.apache.flink.orc.OrcColumnarRowSplitReaderTest.testReadFileWithTypes(OrcColumnarRowSplitReaderTest.java:371)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>
> at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>
> at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>
> at
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>
> at
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>
> at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>
> at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>
> My next run, this test fails.  The test doesn't exist anymore so I'm
> guessing it might be an issue with my head.
>
> Expected: a value less than or equal to <160290L>
>
>  but: <336174L> was greater than <160290L>
>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>
> at
> org.apache.flink.streaming.runtime.io.AlternatingCheckpointBarrierHandlerTest.assertMetrics(AlternatingCheckpointBarrierHandlerTest.java:212)
>
> at
> org.apache.flink.streaming.runtime.io.AlternatingCheckpointBarrierHandlerTest.testMetricsAlternation(AlternatingCheckpointBarrierHandlerTest.java:120)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.junit.run