Re: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout

2022-11-03 Thread Ori Popowski
Martin, as I said, the problem is with GC, the network issue is just a
symptom.

I just wanted to say that after a lot of troubleshooting which didn't
achieve any insight we decided to use YARN Node Labels feature to run the
job only on Google Dataproc's secondary workers. The problem went away
completely and my only conclusion is that indeed, the YARN daemons on the
primary workers were the culprit. We will let Google Cloud know of this.
Unfortunately, due to the current configuration we run two redundant
machines (the two mandatory primary workers which don't do anything), so
this is only a temporary fix until we discover the real issue.


On Tue, Oct 4, 2022 at 8:23 PM Martijn Visser 
wrote:

> Hi Ori,
>
> Thanks for reaching out! I do fear that there's not much that we can help
> out with. As you mentioned, it looks like there's a network issue which
> would be on the Google side of issues. I'm assuming that the mentioned
> Flink version corresponds with Flink 1.12 [1], which isn't supported in the
> Flink community anymore. Are you restarting the job from a savepoint or
> starting fresh without state at all?
>
> Best regards,
>
> Martijn
>
> [1]
> https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
>
> On Sun, Oct 2, 2022 at 3:38 AM Ori Popowski  wrote:
>
>> Hi,
>>
>> We're using Flink 2.10.2 on Google Dataproc.
>>
>> Lately we experience a very unusual problem: the job fails and when it's
>> trying to recover we get this error:
>>
>> Slot request bulk is not fulfillable! Could not allocate the required
>> slot within slot request timeout
>>
>> I investigated what happened and I saw that the failure is caused by a
>> heartbeat timeout to one of the containers. I looked at the container's
>> logs and I saw something unusual:
>>
>>1. Eight minutes before the heartbeat timeout the logs show
>>connection problems to the Confluent Kafka topic and also to Datadog, 
>> which
>>means there's a network issue with the whole node or just the specific
>>container.
>>2. The container logs disappear at this point, but the node logs show
>>multiple Garbage Collection pauses, ranging from 10 seconds to 215 (!)
>>seconds.
>>
>> It looks like right after the network issue the node itself gets into an
>> endless GC phase, and my theory is that the slots are not fulfillable
>> because the node itself is not available because it gets into an endless GC.
>>
>> I want to note that we've been running this job for months without any
>> issues. The issues started one month ago arbitrarily, not following a Flink
>> version upgrade, job code upgrade, change in amount or type of data being
>> processed, and neither a Dataproc image version change.
>>
>> Attached are job manager jogs, container logs, and node logs.
>>
>> How can we recover from this issue?
>>
>> Thanks!
>>
>>


Re: Adjusted frame length exceeds 2147483647

2022-03-18 Thread Ori Popowski
I am not aware that we use Nessus. We are running on Google Cloud Dataproc
vanilla. On the other hand, Flink restarted from a checkpoint and reread
the Kafka offset and didn't encounter this error again, so if it was
something in the input I would expect it to happen again when it processes
the Kafka record that caused the problem.

If it happens again I will enable TLS as described in the ticket.


On Fri, Mar 18, 2022 at 8:57 AM Matthias Pohl  wrote:

> One other pointer: Martijn mentioned in FLINK-24923 [1] tools like Nessus
> could generate traffic while scanning for ports. It's just the size of the
> request that is suspicious.
>
> [1] https://issues.apache.org/jira/browse/FLINK-24923
>
> On Thu, Mar 17, 2022 at 5:29 PM Ori Popowski  wrote:
>
>> This issue did not repeat, so it may be a network issue
>>
>> On Thu, Mar 17, 2022 at 6:12 PM Matthias Pohl  wrote:
>>
>>> Hi Ori,
>>> that looks odd. The message seems to exceed the maximum size
>>> of 2147483647 bytes (2GB). I couldn't find anything similar in the ML or in
>>> Jira that supports a bug in Flink. Could it be that there was some network
>>> issue?
>>>
>>> Matthias
>>>
>>> On Tue, Mar 15, 2022 at 6:52 AM Ori Popowski  wrote:
>>>
>>>> I am running a production job for at least 1 year, and I got to day
>>>> this error:
>>>>
>>>>
>>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>>> Adjusted frame length exceeds 2147483647: 2969686273 - discarded
>>>> (connection to
>>>> 'flink-session-playback-prod-1641716499-sw-6q8p.c.data-prod-292614.internal/
>>>> 10.208.65.38:40737')
>>>>
>>>> Nothing was changed in the code for a long time. What's causing this
>>>> error and how to fix it? I am running Flink 1.10.3 on YARN.
>>>>
>>>> This is the full stack trace:
>>>>
>>>> 2022-03-15 03:22:13
>>>> org.apache.flink.runtime.io.network.netty.exception.
>>>> LocalTransportException: Adjusted frame length exceeds 2147483647:
>>>> 2969686273 - discarded (connection to
>>>> 'flink-session-playback-prod-1641716499-sw-6q8p.c.data-prod-292614.internal/
>>>> 10.208.65.38:40737')
>>>> at org.apache.flink.runtime.io.network.netty.
>>>> CreditBasedPartitionRequestClientHandler.exceptionCaught(
>>>> CreditBasedPartitionRequestClientHandler.java:165)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.invokeExceptionCaught(
>>>> AbstractChannelHandlerContext.java:297)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.invokeExceptionCaught(
>>>> AbstractChannelHandlerContext.java:276)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.fireExceptionCaught(
>>>> AbstractChannelHandlerContext.java:268)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> ChannelInboundHandlerAdapter.exceptionCaught(
>>>> ChannelInboundHandlerAdapter.java:143)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.invokeExceptionCaught(
>>>> AbstractChannelHandlerContext.java:297)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.notifyHandlerException(
>>>> AbstractChannelHandlerContext.java:831)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.invokeChannelRead(
>>>> AbstractChannelHandlerContext.java:376)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.invokeChannelRead(
>>>> AbstractChannelHandlerContext.java:360)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.fireChannelRead(
>>>> AbstractChannelHandlerContext.java:352)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline
>>>> .java:1421)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannelHandlerContext.invokeChannelRead(
>>>> AbstractChannelHandlerContext.java:374)
>>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> Abst

Re: Adjusted frame length exceeds 2147483647

2022-03-17 Thread Ori Popowski
This issue did not repeat, so it may be a network issue

On Thu, Mar 17, 2022 at 6:12 PM Matthias Pohl  wrote:

> Hi Ori,
> that looks odd. The message seems to exceed the maximum size of 2147483647
> bytes (2GB). I couldn't find anything similar in the ML or in Jira that
> supports a bug in Flink. Could it be that there was some network issue?
>
> Matthias
>
> On Tue, Mar 15, 2022 at 6:52 AM Ori Popowski  wrote:
>
>> I am running a production job for at least 1 year, and I got to day this
>> error:
>>
>>
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> Adjusted frame length exceeds 2147483647: 2969686273 - discarded
>> (connection to
>> 'flink-session-playback-prod-1641716499-sw-6q8p.c.data-prod-292614.internal/
>> 10.208.65.38:40737')
>>
>> Nothing was changed in the code for a long time. What's causing this
>> error and how to fix it? I am running Flink 1.10.3 on YARN.
>>
>> This is the full stack trace:
>>
>> 2022-03-15 03:22:13
>> org.apache.flink.runtime.io.network.netty.exception.
>> LocalTransportException: Adjusted frame length exceeds 2147483647:
>> 2969686273 - discarded (connection to
>> 'flink-session-playback-prod-1641716499-sw-6q8p.c.data-prod-292614.internal/
>> 10.208.65.38:40737')
>> at org.apache.flink.runtime.io.network.netty.
>> CreditBasedPartitionRequestClientHandler.exceptionCaught(
>> CreditBasedPartitionRequestClientHandler.java:165)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeExceptionCaught(
>> AbstractChannelHandlerContext.java:297)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeExceptionCaught(
>> AbstractChannelHandlerContext.java:276)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.fireExceptionCaught(
>> AbstractChannelHandlerContext.java:268)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter
>> .java:143)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeExceptionCaught(
>> AbstractChannelHandlerContext.java:297)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.notifyHandlerException(
>> AbstractChannelHandlerContext.java:831)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:376)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:360)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.fireChannelRead(
>> AbstractChannelHandlerContext.java:352)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline
>> .java:1421)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:374)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannelHandlerContext.invokeChannelRead(
>> AbstractChannelHandlerContext.java:360)
>> at org.apache.flink.shaded.netty4.io.netty.channel.
>> DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>> AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163
>> )
>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .processSelectedKey(NioEventLoop.java:697)
>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .processSelectedKeysOptimized(NioEventLoop.java:632)
>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .processSelectedKeys(NioEventLoop.java:549)
>> at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .run(NioEventLoop.java:511)
>> at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>> at org.apache.flink.shaded.netty4.io.netty.util.internal.
>> ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.shaded.netty4.io.netty.handler.codec.
>> TooLongFrameException: Adjusted frame length exceed

Adjusted frame length exceeds 2147483647

2022-03-14 Thread Ori Popowski
I am running a production job for at least 1 year, and I got to day this
error:


org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Adjusted frame length exceeds 2147483647: 2969686273 - discarded
(connection to
'flink-session-playback-prod-1641716499-sw-6q8p.c.data-prod-292614.internal/
10.208.65.38:40737')

Nothing was changed in the code for a long time. What's causing this error
and how to fix it? I am running Flink 1.10.3 on YARN.

This is the full stack trace:

2022-03-15 03:22:13
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Adjusted frame length exceeds 2147483647: 2969686273 - discarded
(connection to
'flink-session-playback-prod-1641716499-sw-6q8p.c.data-prod-292614.internal/
10.208.65.38:40737')
at org.apache.flink.runtime.io.network.netty.
CreditBasedPartitionRequestClientHandler.exceptionCaught(
CreditBasedPartitionRequestClientHandler.java:165)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:297)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:276)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireExceptionCaught(
AbstractChannelHandlerContext.java:268)
at org.apache.flink.shaded.netty4.io.netty.channel.
ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter
.java:143)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:297)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.notifyHandlerException(
AbstractChannelHandlerContext.java:831)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:376)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
1421)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.
AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKey(NioEventLoop.java:697)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKeysOptimized(NioEventLoop.java:632)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKeys(NioEventLoop.java:549)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(
NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io.netty.util.internal.
ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.shaded.netty4.io.netty.handler.codec.
TooLongFrameException: Adjusted frame length exceeds 2147483647: 2969686273
- discarded
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:513)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder
.java:491)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
LengthFieldBasedFrameDecoder.exceededFrameLength(
LengthFieldBasedFrameDecoder.java:378)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:421)
at org.apache.flink.runtime.io.network.netty.
NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:214)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:334)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder
.java:505)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
Byte

Re: Huge backpressure when using AggregateFunction with Session Window

2021-10-21 Thread Ori Popowski
Thanks for taking the time to answer this.

   - You're correct that the SimpleAggregator is not used in the job setup.
   I didn't copy the correct piece of code.
   - I understand the overhead involved. But I do not agree with the O(n^2)
   complexity. Are you implying that Vector append is O(n) by itself?
   - I understand your points regarding ProcessFunction except for the "without
   touching the previously stored event". Also with AggregateFunction +
   concatenation I don't touch the elements other than the new element. I
   forgot to mention by the way, that the issue reproduces also with Lists
   which should be much faster for appends and concats.

Could overhead by itself account for the backpressure?
>From this job the only conclusion is that Flink just cannot do aggregating
operations which collect values, only simple operations which produce a
scalar values (like sum/avg). It seems weird to me Flink would be so
limited in such way.



On Wed, Oct 20, 2021 at 7:03 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Ori,
>
>
>
> Just a couple of comments (some code is missing for a concise explanation):
>
>- SimpleAggregator is not used in the job setup below (assuming
>another job setup)
>- SimpleAggregator is called for each event that goes into a specific
>session window, however
>   - The scala vectors will ever grow with the number of events that
>   end up in a single window, hence
>   - Your BigO complexity will be O(n^2), n: number of events in
>   window (or worse)
>   - For each event the accumulator is retrieved from window state and
>   stored to window state (and serialized, if on RocksDB Backend)
>- On the other hand when you use a process function
>   - Flink keeps a list state of events belonging to the session
>   window, and
>   - Only when the window is triggered (on session gap timeout) all
>   events are retrieved from window state and processed
>   - On RocksDbBackend the new events added to the window are appended
>   to the existing window state key without touching the previously stored
>   events, hence
>   - Serialization is only done once per incoming event, and
>   - BigO complexity is around O(n)
>
>
>
> … much simplified
>
>
>
> When I started with similar questions I spent quite some time in the
> debugger, breaking into the windowing functions and going up the call
> stack, in order to understand how Flink works … time well spent
>
>
>
>
>
> I hope this helps …
>
>
>
> I won’t be able to follow up for the next 1 ½ weeks, unless you try to
> meet me on FlinkForward conference …
>
>
>
> Thias
>
>
>
> *From:* Ori Popowski 
> *Sent:* Mittwoch, 20. Oktober 2021 16:17
> *To:* user 
> *Subject:* Huge backpressure when using AggregateFunction with Session
> Window
>
>
>
> I have a simple Flink application with a simple keyBy, a SessionWindow,
> and I use an AggregateFunction to incrementally aggregate a result, and
> write to a Sink.
>
>
>
> Some of the requirements involve accumulating lists of fields from the
> events (for example, all URLs), so not all the values in the end should be
> primitives (although some are, like total number of events, and session
> duration).
>
>
>
> This job is experiencing a huge backpressure 40 minutes after launching.
>
>
>
> I've found out that the append and concatenate operations in the logic of
> my AggregateFunction's add() and merge() functions are what's ruining the
> job (i.e. causing the backpressure).
>
>
>
> I've managed to create a reduced version of my job, where I just append
> and concatenate some of the event values and I can confirm that a
> backpressure starts just 40 minutes after launching the job:
>
>
>
> *class *SimpleAggregator *extends *AggregateFunction[Event, Accumulator, 
> Session] *with *LazyLogging {
>
>   *override def *createAccumulator(): Accumulator = (
> *Vector*.*empty*,
> *Vector*.*empty*,
> *Vector*.*empty*,
> *Vector*.*empty*,
> *Vector*.
> *empty  *)
>
>   *override def *add(value: Event, accumulator: Accumulator): Accumulator 
> = {
> (
>   accumulator._1 :+ value.getEnvUrl,
>   accumulator._2 :+ value.getCtxVisitId,
>   accumulator._3 :+ value.getVisionsSId,
>   accumulator._4 :+ value.getTime.longValue(),
>   accumulator._5 :+ value.getTime.longValue()
> )
>   }
>
>   *override def *merge(a: Accumulator, b: Accumulator): Accumulator = {
> (
>  

Re: Huge backpressure when using AggregateFunction with Session Window

2021-10-21 Thread Ori Popowski
I didn't try to reproduce it locally since this job reads 14K events per
second.
I am using Flink version 1.12.1 and RocksDB state backend. It also happens
with Flink 1.10.

I tried to profile with JVisualVM and I didn't see any bottleneck. All the
user functions almost didn't take any CPU time.

On Wed, Oct 20, 2021 at 6:50 PM Timo Walther  wrote:

> Hi Ori,
>
> this sounds indeed strange. Can you also reproduce this behavior locally
> with a faker source? We should definitely add a profiler and see where
> the bottleneck lies.
>
> Which Flink version and state backend are you using?
>
> Regards,
> Timo
>
> On 20.10.21 16:17, Ori Popowski wrote:
> > I have a simple Flink application with a simple keyBy, a SessionWindow,
> > and I use an AggregateFunction to incrementally aggregate a result, and
> > write to a Sink.
> >
> > Some of the requirements involve accumulating lists of fields from the
> > events (for example, all URLs), so not all the values in the end should
> > be primitives (although some are, like total number of events, and
> > session duration).
> >
> > This job is experiencing a huge backpressure 40 minutes after launching.
> >
> > I've found out that the append and concatenate operations in the logic
> > of my AggregateFunction's add() and merge() functions are what's ruining
> > the job (i.e. causing the backpressure).
> >
> > I've managed to create a reduced version of my job, where I just append
> > and concatenate some of the event values and I can confirm that a
> > backpressure starts just 40 minutes after launching the job:
> >
> > class SimpleAggregator extends AggregateFunction[Event, Accumulator,
> > Session] with LazyLogging {
> >
> > override def createAccumulator(): Accumulator = (
> > Vector.empty,
> > Vector.empty,
> > Vector.empty,
> > Vector.empty,
> > Vector.empty
> > )
> >
> > override def add(value: Event, accumulator: Accumulator): Accumulator = {
> > (
> > accumulator._1 :+ value.getEnvUrl,
> > accumulator._2 :+ value.getCtxVisitId,
> > accumulator._3 :+ value.getVisionsSId,
> > accumulator._4 :+ value.getTime.longValue(),
> > accumulator._5 :+ value.getTime.longValue()
> > )
> > }
> >
> > override def merge(a: Accumulator, b: Accumulator): Accumulator = {
> > (
> > a._1 ++ b._1,
> > a._2 ++ b._2,
> > a._3 ++ b._3,
> > a._4 ++ b._4,
> > a._5 ++ b._5
> > )
> > }
> >
> > override def getResult(accumulator: Accumulator): Session = {
> > Session.newBuilder()
> > .setSessionDuration(1000)
> > .setSessionTotalEvents(1000)
> > .setSId("-" + UUID.randomUUID().toString)
> > .build()
> > }
> > }
> >
> >
> > This is the job overall (simplified version):
> >
> > class App(
> > source: SourceFunction[Event],
> > sink: SinkFunction[Session]
> > ) {
> >
> > def run(config: Config): Unit = {
> > val senv = StreamExecutionEnvironment.getExecutionEnvironment
> > senv.setMaxParallelism(256)
> > val dataStream = senv.addSource(source).uid("source")
> > dataStream
> > .assignAscendingTimestamps(_.getTime)
> > .keyBy(event => (event.getWmUId, event.getWmEnv,
> event.getSId).toString())
> > .window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
> > .allowedLateness(0.seconds.asFlinkTime)
> > .process(new ProcessFunction).uid("process-session")
> > .addSink(sink).uid("sink")
> >
> > senv.execute("session-aggregation")
> > }
> > }
> >
> >
> > After 3 weeks of grueling debugging, profiling, checking the
> > serialization and more I couldn't solve the backpressure issue.
> > However, I got an idea and used Flink's ProcessWindowFunction which just
> > aggregates all the events behind the scenes and just gives them to me as
> > an iterator, where I can then do all my calculations.
> > Surprisingly, there's no backpressure. So even though the
> > ProcessWindowFunction actually aggregates more data, and also does
> > concatenations and appends, for some reason there's no backpressure.
> >
> > To finish this long post, what I'm trying to understand here is why when
> > I collected the events using an AggregateFunction there was a
> > backpressure, and when Flink does this for me with ProcessWindowFunction
> > there's no backpressure? It seems to me something is fundamentally wrong
> > here, since it means I cannot do any non-reducing operations without
> > creating backpressure. I think it shouldn't cause the backpressure I
> > experienced. I'm trying to understand what I did wrong here.
> >
> > Thanks!
>
>


Huge backpressure when using AggregateFunction with Session Window

2021-10-20 Thread Ori Popowski
I have a simple Flink application with a simple keyBy, a SessionWindow, and
I use an AggregateFunction to incrementally aggregate a result, and write
to a Sink.

Some of the requirements involve accumulating lists of fields from the
events (for example, all URLs), so not all the values in the end should be
primitives (although some are, like total number of events, and session
duration).

This job is experiencing a huge backpressure 40 minutes after launching.

I've found out that the append and concatenate operations in the logic of
my AggregateFunction's add() and merge() functions are what's ruining the
job (i.e. causing the backpressure).

I've managed to create a reduced version of my job, where I just append and
concatenate some of the event values and I can confirm that a backpressure
starts just 40 minutes after launching the job:

class SimpleAggregator extends AggregateFunction[Event,
Accumulator, Session] with LazyLogging {

  override def createAccumulator(): Accumulator = (
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty
  )

  override def add(value: Event, accumulator: Accumulator): Accumulator = {
(
  accumulator._1 :+ value.getEnvUrl,
  accumulator._2 :+ value.getCtxVisitId,
  accumulator._3 :+ value.getVisionsSId,
  accumulator._4 :+ value.getTime.longValue(),
  accumulator._5 :+ value.getTime.longValue()
)
  }

  override def merge(a: Accumulator, b: Accumulator): Accumulator = {
(
  a._1 ++ b._1,
  a._2 ++ b._2,
  a._3 ++ b._3,
  a._4 ++ b._4,
  a._5 ++ b._5
)
  }

  override def getResult(accumulator: Accumulator): Session = {
Session.newBuilder()
  .setSessionDuration(1000)
  .setSessionTotalEvents(1000)
  .setSId("-" + UUID.randomUUID().toString)
  .build()
  }
}


This is the job overall (simplified version):

class App(
  source: SourceFunction[Event],
  sink: SinkFunction[Session]
) {

  def run(config: Config): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setMaxParallelism(256)
val dataStream = senv.addSource(source).uid("source")
dataStream
  .assignAscendingTimestamps(_.getTime)
  .keyBy(event => (event.getWmUId, event.getWmEnv,
event.getSId).toString())
  
.window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
  .allowedLateness(0.seconds.asFlinkTime)
  .process(new ProcessFunction).uid("process-session")
  .addSink(sink).uid("sink")

senv.execute("session-aggregation")
  }
}


After 3 weeks of grueling debugging, profiling, checking the serialization
and more I couldn't solve the backpressure issue.
However, I got an idea and used Flink's ProcessWindowFunction which just
aggregates all the events behind the scenes and just gives them to me as an
iterator, where I can then do all my calculations.
Surprisingly, there's no backpressure. So even though the
ProcessWindowFunction actually aggregates more data, and also does
concatenations and appends, for some reason there's no backpressure.

To finish this long post, what I'm trying to understand here is why when I
collected the events using an AggregateFunction there was a backpressure,
and when Flink does this for me with ProcessWindowFunction there's no
backpressure? It seems to me something is fundamentally wrong here, since
it means I cannot do any non-reducing operations without creating
backpressure. I think it shouldn't cause the backpressure I experienced.
I'm trying to understand what I did wrong here.

Thanks!


Re: Exception: SequenceNumber is treated as a generic type

2021-10-18 Thread Ori Popowski
Got that, thanks. I'll try

On Mon, Oct 18, 2021 at 11:50 AM Arvid Heise  wrote:

> If you submit a fat jar to Flink, it contains the Kinesis connector. Dawid
> was suggesting to also add the SequenceNumber to your src with the original
> package name such that you effectively overwrite the class of Kinesis while
> creating the fat jar (there should be warning and you should double-check
> that your SequenceNumber wins).
>
> On Thu, Oct 14, 2021 at 3:22 PM Ori Popowski  wrote:
>
>> Thanks for answering.
>>
>> Not sure I understood the hack suggestion. If I copy SequenceNumber over
>> to my job, how the original Flink Kinesis lib will use that class? It's
>> fixed on a specific package (in this case
>> org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to
>> somehow hack the JAR itself and replace the class with an annotated class?
>>
>> About the backpressure - I eliminated almost everything by now, so I
>> don't know what it could be. I've ran out of ideas so I'm starting to look
>> into serialization. The job is very, very simple. No algorithms. Most
>> operations are just list/set concatenations, and still getting
>> backpressure, no matter how big a cluster I use. I know where the
>> backpressure is, I also started profiling and there's not a single function
>> which is slow. GC is also looking good, no long pauses.
>>
>> On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hey Ori,
>>>
>>> As for the SequenceNumber issue, I'd say yes, it can be seen as a bug.
>>> In the current state one can not use kinesis consumer with the
>>> pipeline.generic-types=false. The problem is because we use the
>>> TypeInformation.of(SequenceNumber.class) method, which will in this case
>>> always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
>>> uses KryoSerializer. That is way it does not help to register a Kryo
>>> serializer, it is still a generic type).
>>>
>>> A dirty hack for you to try, could be to copy over the SequenceNumber
>>> over to your job and annotate it with TypeInfo where you provide a factory
>>> that would create something other than GenericTypeInfo (you could even use
>>> a copy of GenericTypeInfo, but with a removed check for the
>>> pipeline.generic-types flag). I know it is a really dirty hack.
>>>
>>> Ad. 2 Unfortunately I can't think of a better way.
>>>
>>> I have created FLINK-24549 to track the kinesis issue.[1]
>>>
>>> On the backpressure note, are you sure the issue is in the
>>> serialization? Have you tried identifying the slow task first?[2]
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-24549
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>>> On 14/10/2021 12:41, Ori Popowski wrote:
>>>
>>> I'd appreciate if someone could advice on this issue.
>>>
>>> Thanks
>>>
>>> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski  wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a large backpressure in a somewhat simple Flink application in
>>>> Scala. Using Flink version 1.12.1.
>>>>
>>>> To find the source of the problem, I want to eliminate all classes with
>>>> generic serialization, so I set
>>>> pipeline.generic-types=false
>>>>
>>>> in order to spot those classes and write a serializer for them.
>>>>
>>>> However, for some reason, I get the stracktrace attached below.
>>>>
>>>>1. It looks suspicious that one of Flink's own classes doesn't have
>>>>a serializer and should fallback to generic serialization. Is this a 
>>>> bug?
>>>>2. I want to get a list of all classes which fallback to generic
>>>>serialization. How can I do it other than setting
>>>>pipeline.generic-types=false and eliminating those classes one by
>>>>one?
>>>>3. I defined a custom Kryo serializer for this class using both
>>>>addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…)
>>>>and I still get the same error message. How can I provide Flink with 
>>>> custom
>>>>serialization so it stops complaining about this?
>>>>
>>>>
>>>>
>>

Re: Exception: SequenceNumber is treated as a generic type

2021-10-14 Thread Ori Popowski
Thanks for answering.

Not sure I understood the hack suggestion. If I copy SequenceNumber over to
my job, how the original Flink Kinesis lib will use that class? It's fixed
on a specific package (in this case
org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to
somehow hack the JAR itself and replace the class with an annotated class?

About the backpressure - I eliminated almost everything by now, so I don't
know what it could be. I've ran out of ideas so I'm starting to look into
serialization. The job is very, very simple. No algorithms. Most operations
are just list/set concatenations, and still getting backpressure, no matter
how big a cluster I use. I know where the backpressure is, I also started
profiling and there's not a single function which is slow. GC is also
looking good, no long pauses.

On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz 
wrote:

> Hey Ori,
>
> As for the SequenceNumber issue, I'd say yes, it can be seen as a bug. In
> the current state one can not use kinesis consumer with the
> pipeline.generic-types=false. The problem is because we use the
> TypeInformation.of(SequenceNumber.class) method, which will in this case
> always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
> uses KryoSerializer. That is way it does not help to register a Kryo
> serializer, it is still a generic type).
>
> A dirty hack for you to try, could be to copy over the SequenceNumber over
> to your job and annotate it with TypeInfo where you provide a factory that
> would create something other than GenericTypeInfo (you could even use a
> copy of GenericTypeInfo, but with a removed check for the
> pipeline.generic-types flag). I know it is a really dirty hack.
>
> Ad. 2 Unfortunately I can't think of a better way.
>
> I have created FLINK-24549 to track the kinesis issue.[1]
>
> On the backpressure note, are you sure the issue is in the serialization?
> Have you tried identifying the slow task first?[2]
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-24549
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
> On 14/10/2021 12:41, Ori Popowski wrote:
>
> I'd appreciate if someone could advice on this issue.
>
> Thanks
>
> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski  wrote:
>
>> Hi,
>>
>> I have a large backpressure in a somewhat simple Flink application in
>> Scala. Using Flink version 1.12.1.
>>
>> To find the source of the problem, I want to eliminate all classes with
>> generic serialization, so I set
>> pipeline.generic-types=false
>>
>> in order to spot those classes and write a serializer for them.
>>
>> However, for some reason, I get the stracktrace attached below.
>>
>>1. It looks suspicious that one of Flink's own classes doesn't have a
>>serializer and should fallback to generic serialization. Is this a bug?
>>2. I want to get a list of all classes which fallback to generic
>>serialization. How can I do it other than setting
>>pipeline.generic-types=false and eliminating those classes one by one?
>>3. I defined a custom Kryo serializer for this class using both
>>addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…) and
>>I still get the same error message. How can I provide Flink with custom
>>serialization so it stops complaining about this?
>>
>>
>>
>> java.lang.UnsupportedOperationException: Generic types have been disabled
>> in the ExecutionConfig and type
>> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
>> treated as a generic type.
>> at
>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>> at
>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
>> at
>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
>> at
>> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
>> at
>> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
>> at
>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunc

Re: Exception: SequenceNumber is treated as a generic type

2021-10-14 Thread Ori Popowski
I'd appreciate if someone could advice on this issue.

Thanks

On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski  wrote:

> Hi,
>
> I have a large backpressure in a somewhat simple Flink application in
> Scala. Using Flink version 1.12.1.
>
> To find the source of the problem, I want to eliminate all classes with
> generic serialization, so I set
> pipeline.generic-types=false
>
> in order to spot those classes and write a serializer for them.
>
> However, for some reason, I get the stracktrace attached below.
>
>1. It looks suspicious that one of Flink's own classes doesn't have a
>serializer and should fallback to generic serialization. Is this a bug?
>2. I want to get a list of all classes which fallback to generic
>serialization. How can I do it other than setting
>pipeline.generic-types=false and eliminating those classes one by one?
>3. I defined a custom Kryo serializer for this class using both
>addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…) and
>I still get the same error message. How can I provide Flink with custom
>serialization so it stops complaining about this?
>
>
>
> java.lang.UnsupportedOperationException: Generic types have been disabled
> in the ExecutionConfig and type
> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
> treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
> at
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
> at
> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
> at
> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
>
>
>


Exception: SequenceNumber is treated as a generic type

2021-10-12 Thread Ori Popowski
Hi,

I have a large backpressure in a somewhat simple Flink application in
Scala. Using Flink version 1.12.1.

To find the source of the problem, I want to eliminate all classes with
generic serialization, so I set
pipeline.generic-types=false

in order to spot those classes and write a serializer for them.

However, for some reason, I get the stracktrace attached below.

   1. It looks suspicious that one of Flink's own classes doesn't have a
   serializer and should fallback to generic serialization. Is this a bug?
   2. I want to get a list of all classes which fallback to generic
   serialization. How can I do it other than setting
   pipeline.generic-types=false and eliminating those classes one by one?
   3. I defined a custom Kryo serializer for this class using both
   addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…) and I
   still get the same error message. How can I provide Flink with custom
   serialization so it stops complaining about this?



java.lang.UnsupportedOperationException: Generic types have been disabled
in the ExecutionConfig and type
org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
treated as a generic type.
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
at
org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
at
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


Table API, accessing nested fields

2020-11-10 Thread Ori Popowski
How can I access nested fields e.g. in select statements?

For example, this won't work:

 val table = tenv
  .fromDataStream(stream)
  .select($"context.url", $"name")

What is the correct way?

Thanks.


Re: SQL aggregation functions inside the Table API

2020-11-09 Thread Ori Popowski
Thanks


On Mon, Nov 9, 2020 at 4:50 PM Timo Walther  wrote:

> Hi Ori,
>
> we might support SQL expressions soon in Table API. However, we might
> not support aggregate functions immediately. I would recommend to use
> `sqlQuery` for now.
>
> The following is supported:
>
> val table = tenv.fromDataStream(stream)
>
> val newTable = tenv.sqlQuery(s"SELECT ... FROM $table")
>
> So switching between Table API and SQL can be done fluently.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 09.11.20 14:33, Ori Popowski wrote:
> > Hi,
> >
> > Some functions only exist in the SQL interface and are missing from the
> > Table API. For example LAST_VALUE(expression) [1]
> >
> > I still want to use this function in my aggregation, and I don't want to
> > implement a user-defined function. Can I combine an SQL expression
> > inside my Table API?
> >
> > For example:
> >
> > val table = tenv
> >.fromDataStream(stream)
> >.groupBy($"name")
> >.select($"name", $"products".count(), $"LAST_VALUE(age)")
> >
> > If not - how can I get the last value of a column inside an aggregation?
> >
> > Thanks.
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#aggregate-functions
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#aggregate-functions
> >
>
>


SQL aggregation functions inside the Table API

2020-11-09 Thread Ori Popowski
Hi,

Some functions only exist in the SQL interface and are missing from the
Table API. For example LAST_VALUE(expression) [1]

I still want to use this function in my aggregation, and I don't want to
implement a user-defined function. Can I combine an SQL expression inside
my Table API?

For example:

val table = tenv
  .fromDataStream(stream)
  .groupBy($"name")
  .select($"name", $"products".count(), $"LAST_VALUE(age)")

If not - how can I get the last value of a column inside an aggregation?

Thanks.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#aggregate-functions


Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
- I will increase the jvm-overhead
- I don't have any failovers or restarts until it starts happening
- If it happens again even with the changes, I'll post the NMT output

On Fri, Oct 30, 2020 at 3:54 AM Xintong Song  wrote:

> Hi Ori,
>
> I'm not sure about where the problem comes from. There are several things
> that might worse a try.
> - Further increasing the `jvm-overhead`. Your `ps` result suggests that
> the Flink process uses 120+GB, while `process.size` is configured 112GB. So
> I think 2GB `jvm-overhead` might not be enough. I would suggest to tune
> `managed.fraction` back to 0.4 and increase `jvm-overhead` to around 12GB.
> This should give you roughly the same `process.size` as before, while
> leaving more unmanaged native memory space.
> - During the 7-10 job running days, are there any failovers/restarts? If
> yes, you might want to look into this comment [1] in FLINK-18712.
> - If neither of the above actions helps, we might need to leverage tools
> (e.g., JVM NMT [2]) to track the native memory usages and see where exactly
> the leak comes from.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-18712?focusedCommentId=17189138&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17189138
>
> [2]
> https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html
>
> On Thu, Oct 29, 2020 at 7:51 PM Ori Popowski  wrote:
>
>>
>> Hi Xintong,
>>
>> Unfortunately I cannot upgrade to 1.10.2, because EMR has either 1.10.0
>> or 1.11.0.
>>
>> About the overhead - turns out I already configured
>> taskmanager.memory.jvm-overhead.max to 2 gb instead of the default 1 gb.
>> Should I increase it further?
>>
>> state.backend.rocksdb.memory.managed is already not explicitly
>> configured.
>>
>> Is there anything else I can do?
>>
>>
>>
>> On Thu, Oct 29, 2020 at 1:24 PM Xintong Song 
>> wrote:
>>
>>> Hi Ori,
>>>
>>> RocksDB also uses managed memory. If the memory overuse indeed comes
>>> from RocksDB, then increasing managed memory fraction will not help.
>>> RocksDB will try to use as many memory as the configured managed memory
>>> size. Therefore increasing managed memory fraction also makes RocksDB try
>>> to use more memory. That is why I suggested increasing `jvm-overhead`
>>> instead.
>>>
>>> Please also make sure the configuration option
>>> `state.backend.rocksdb.memory.managed` is either not explicitly configured,
>>> or configured to `true`.
>>>
>>> In addition, I noticed that you are using Flink 1.10.0. You might want
>>> to upgrade to 1.10.2, to include the latest bug fixes on the 1.10 release.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Oct 29, 2020 at 4:41 PM Ori Popowski  wrote:
>>>
>>>> Hi,
>>>>
>>>> PID 20331 is indeed the Flink process, specifically the TaskManager
>>>> process.
>>>>
>>>> - Workload is a streaming workload reading from Kafka and writing to S3
>>>> using a custom Sink
>>>> - RockDB state backend is used with default settings
>>>> - My external dependencies are:
>>>> -- logback
>>>> -- jackson
>>>> -- flatbuffers
>>>> -- jaxb-api
>>>> -- scala-java8-compat
>>>> -- apache commons-io
>>>> -- apache commons-compress
>>>> -- software.amazon.awssdk s3
>>>> - What do you mean by UDFs? I've implemented several operators like
>>>> KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.
>>>>
>>>> We use a SessionWindow with 30 minutes of gap, and a watermark with 10
>>>> minutes delay.
>>>>
>>>> We did confirm we have some keys in our job which keep receiving
>>>> records indefinitely, but I'm not sure why it would cause a managed memory
>>>> leak, since this should be flushed to RocksDB and free the memory used. We
>>>> have a guard against this, where we keep the overall size of all the
>>>> records for each key, and when it reaches 300mb, we don't move the records
>>>> downstream, which causes them to create a session and go through the sink.
>>>>
>>>> About what you suggested - I kind of did this by increasing the managed
>>>> memory fraction to 0.5. And it did postpone the occurrence of the problem
>>>> (meaning, the TMs started 

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
Hi Xintong,

Unfortunately I cannot upgrade to 1.10.2, because EMR has either 1.10.0 or
1.11.0.

About the overhead - turns out I already configured
taskmanager.memory.jvm-overhead.max to 2 gb instead of the default 1 gb.
Should I increase it further?

state.backend.rocksdb.memory.managed is already not explicitly configured.

Is there anything else I can do?



On Thu, Oct 29, 2020 at 1:24 PM Xintong Song  wrote:

> Hi Ori,
>
> RocksDB also uses managed memory. If the memory overuse indeed comes from
> RocksDB, then increasing managed memory fraction will not help. RocksDB
> will try to use as many memory as the configured managed memory size.
> Therefore increasing managed memory fraction also makes RocksDB try to use
> more memory. That is why I suggested increasing `jvm-overhead` instead.
>
> Please also make sure the configuration option
> `state.backend.rocksdb.memory.managed` is either not explicitly configured,
> or configured to `true`.
>
> In addition, I noticed that you are using Flink 1.10.0. You might want to
> upgrade to 1.10.2, to include the latest bug fixes on the 1.10 release.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Oct 29, 2020 at 4:41 PM Ori Popowski  wrote:
>
>> Hi,
>>
>> PID 20331 is indeed the Flink process, specifically the TaskManager
>> process.
>>
>> - Workload is a streaming workload reading from Kafka and writing to S3
>> using a custom Sink
>> - RockDB state backend is used with default settings
>> - My external dependencies are:
>> -- logback
>> -- jackson
>> -- flatbuffers
>> -- jaxb-api
>> -- scala-java8-compat
>> -- apache commons-io
>> -- apache commons-compress
>> -- software.amazon.awssdk s3
>> - What do you mean by UDFs? I've implemented several operators like
>> KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.
>>
>> We use a SessionWindow with 30 minutes of gap, and a watermark with 10
>> minutes delay.
>>
>> We did confirm we have some keys in our job which keep receiving records
>> indefinitely, but I'm not sure why it would cause a managed memory leak,
>> since this should be flushed to RocksDB and free the memory used. We have a
>> guard against this, where we keep the overall size of all the records for
>> each key, and when it reaches 300mb, we don't move the records downstream,
>> which causes them to create a session and go through the sink.
>>
>> About what you suggested - I kind of did this by increasing the managed
>> memory fraction to 0.5. And it did postpone the occurrence of the problem
>> (meaning, the TMs started crashing after 10 days instead of 7 days). It
>> looks like anything I'll do on that front will only postpone the problem
>> but not solve it.
>>
>> I am attaching the full job configuration.
>>
>>
>>
>> On Thu, Oct 29, 2020 at 10:09 AM Xintong Song 
>> wrote:
>>
>>> Hi Ori,
>>>
>>> It looks like Flink indeed uses more memory than expected. I assume the
>>> first item with PID 20331 is the flink process, right?
>>>
>>> It would be helpful if you can briefly introduce your workload.
>>> - What kind of workload are you running? Streaming or batch?
>>> - Do you use RocksDB state backend?
>>> - Any UDFs or 3rd party dependencies that might allocate significant
>>> native memory?
>>>
>>> Moreover, if the metrics shows only 20% heap usages, I would suggest
>>> configuring less `task.heap.size`, leaving more memory to off-heap. The
>>> reduced heap size does not necessarily all go to the managed memory. You
>>> can also try increasing the `jvm-overhead`, simply to leave more native
>>> memory in the container in case there are other other significant native
>>> memory usages.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Oct 28, 2020 at 5:53 PM Ori Popowski  wrote:
>>>
>>>> Hi Xintong,
>>>>
>>>> See here:
>>>>
>>>> # Top memory users
>>>> ps auxwww --sort -rss | head -10
>>>> USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
>>>> yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
>>>> /etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
>>>> root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
>>>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>>>> hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 15

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
Hi,

PID 20331 is indeed the Flink process, specifically the TaskManager process.

- Workload is a streaming workload reading from Kafka and writing to S3
using a custom Sink
- RockDB state backend is used with default settings
- My external dependencies are:
-- logback
-- jackson
-- flatbuffers
-- jaxb-api
-- scala-java8-compat
-- apache commons-io
-- apache commons-compress
-- software.amazon.awssdk s3
- What do you mean by UDFs? I've implemented several operators like
KafkaDeserializationSchema, FlatMap, Map, ProcessFunction.

We use a SessionWindow with 30 minutes of gap, and a watermark with 10
minutes delay.

We did confirm we have some keys in our job which keep receiving records
indefinitely, but I'm not sure why it would cause a managed memory leak,
since this should be flushed to RocksDB and free the memory used. We have a
guard against this, where we keep the overall size of all the records for
each key, and when it reaches 300mb, we don't move the records downstream,
which causes them to create a session and go through the sink.

About what you suggested - I kind of did this by increasing the managed
memory fraction to 0.5. And it did postpone the occurrence of the problem
(meaning, the TMs started crashing after 10 days instead of 7 days). It
looks like anything I'll do on that front will only postpone the problem
but not solve it.

I am attaching the full job configuration.



On Thu, Oct 29, 2020 at 10:09 AM Xintong Song  wrote:

> Hi Ori,
>
> It looks like Flink indeed uses more memory than expected. I assume the
> first item with PID 20331 is the flink process, right?
>
> It would be helpful if you can briefly introduce your workload.
> - What kind of workload are you running? Streaming or batch?
> - Do you use RocksDB state backend?
> - Any UDFs or 3rd party dependencies that might allocate significant
> native memory?
>
> Moreover, if the metrics shows only 20% heap usages, I would suggest
> configuring less `task.heap.size`, leaving more memory to off-heap. The
> reduced heap size does not necessarily all go to the managed memory. You
> can also try increasing the `jvm-overhead`, simply to leave more native
> memory in the container in case there are other other significant native
> memory usages.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Oct 28, 2020 at 5:53 PM Ori Popowski  wrote:
>
>> Hi Xintong,
>>
>> See here:
>>
>> # Top memory users
>> ps auxwww --sort -rss | head -10
>> USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
>> yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
>> /etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
>> root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>> hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 153:22
>> /etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
>> yarn 26857  0.3  0.2 4214784 341464 ?  Sl   Sep17 198:43
>> /etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
>> root  5519  0.0  0.2 5658624 269344 ?  Sl   Jul30  45:21
>> /usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
>> root  1781  0.0  0.0 172644  8096 ?Ss   Jul30   2:06
>> /usr/lib/systemd/systemd-journald
>> root  4801  0.0  0.0 2690260 4776 ?Ssl  Jul30   4:42
>> /usr/bin/amazon-ssm-agent
>> root  6566  0.0  0.0 164672  4116 ?R00:30   0:00 ps
>> auxwww --sort -rss
>> root  6532  0.0  0.0 183124  3592 ?S00:30   0:00
>> /usr/sbin/CROND -n
>>
>> On Wed, Oct 28, 2020 at 11:34 AM Xintong Song 
>> wrote:
>>
>>> Hi Ori,
>>>
>>> The error message suggests that there's not enough physical memory on
>>> the machine to satisfy the allocation. This does not necessarily mean a
>>> managed memory leak. Managed memory leak is only one of the possibilities.
>>> There are other potential reasons, e.g., another process/container on the
>>> machine used more memory than expected, Yarn NM is not configured with
>>> enough memory reserved for the system processes, etc.
>>>
>>> I would suggest to first look into the machine memory usages, see
>>> whether the Flink process indeed uses more memory than expected. This could
>>> be achieved via:
>>> - Run the `top` command
>>> - Look into the `/proc/meminfo` file
>>> - Any container memory usage metrics that are available to your Yarn
>>> cluster
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-28 Thread Ori Popowski
Hi Xintong,

See here:

# Top memory users
ps auxwww --sort -rss | head -10
USER   PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
yarn 20339 35.8 97.0 128600192 126672256 ? Sl   Oct15 5975:47
/etc/alternatives/jre/bin/java -Xmx54760833024 -Xms54760833024 -XX:Max
root  5245  0.1  0.4 5580484 627436 ?  Sl   Jul30 144:39
/etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
hadoop5252  0.1  0.4 7376768 604772 ?  Sl   Jul30 153:22
/etc/alternatives/jre/bin/java -Xmx1024m -XX:+ExitOnOutOfMemoryError -X
yarn 26857  0.3  0.2 4214784 341464 ?  Sl   Sep17 198:43
/etc/alternatives/jre/bin/java -Dproc_nodemanager -Xmx2048m -XX:OnOutOf
root  5519  0.0  0.2 5658624 269344 ?  Sl   Jul30  45:21
/usr/bin/java -Xmx1500m -Xms300m -XX:+ExitOnOutOfMemoryError -XX:MinHea
root  1781  0.0  0.0 172644  8096 ?Ss   Jul30   2:06
/usr/lib/systemd/systemd-journald
root  4801  0.0  0.0 2690260 4776 ?Ssl  Jul30   4:42
/usr/bin/amazon-ssm-agent
root  6566  0.0  0.0 164672  4116 ?R00:30   0:00 ps auxwww
--sort -rss
root  6532  0.0  0.0 183124  3592 ?S00:30   0:00
/usr/sbin/CROND -n

On Wed, Oct 28, 2020 at 11:34 AM Xintong Song  wrote:

> Hi Ori,
>
> The error message suggests that there's not enough physical memory on the
> machine to satisfy the allocation. This does not necessarily mean a managed
> memory leak. Managed memory leak is only one of the possibilities. There
> are other potential reasons, e.g., another process/container on the machine
> used more memory than expected, Yarn NM is not configured with enough
> memory reserved for the system processes, etc.
>
> I would suggest to first look into the machine memory usages, see whether
> the Flink process indeed uses more memory than expected. This could be
> achieved via:
> - Run the `top` command
> - Look into the `/proc/meminfo` file
> - Any container memory usage metrics that are available to your Yarn
> cluster
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Oct 27, 2020 at 6:21 PM Ori Popowski  wrote:
>
>> After the job is running for 10 days in production, TaskManagers start
>> failing with:
>>
>> Connection unexpectedly closed by remote task manager
>>
>> Looking in the machine logs, I can see the following error:
>>
>> = Java processes for user hadoop =
>> OpenJDK 64-Bit Server VM warning: INFO:
>> os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error='Cannot
>> allocate memory' (err
>> #
>> # There is insufficient memory for the Java Runtime Environment to
>> continue.
>> # Native memory allocation (mmap) failed to map 1006567424 bytes for
>> committing reserved memory.
>> # An error report file with more information is saved as:
>> # /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
>> === End java processes for user hadoop ===
>>
>> In addition, the metrics for the TaskManager show very low Heap memory
>> consumption (20% of Xmx).
>>
>> Hence, I suspect there is a memory leak in the TaskManager's Managed
>> Memory.
>>
>> This my TaskManager's memory detail:
>> flink process 112g
>> framework.heap.size 0.2g
>> task.heap.size 50g
>> managed.size 54g
>> framework.off-heap.size 0.5g
>> task.off-heap.size 1g
>> network 2g
>> XX:MaxMetaspaceSize 1g
>>
>> As you can see, the managed memory is 54g, so it's already high (my
>> managed.fraction is set to 0.5).
>>
>> I'm running Flink 1.10. Full job details attached.
>>
>> Can someone advise what would cause a managed memory leak?
>>
>>
>>


Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-27 Thread Ori Popowski
After the job is running for 10 days in production, TaskManagers start
failing with:

Connection unexpectedly closed by remote task manager

Looking in the machine logs, I can see the following error:

= Java processes for user hadoop =
OpenJDK 64-Bit Server VM warning: INFO:
os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error='Cannot
allocate memory' (err
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1006567424 bytes for
committing reserved memory.
# An error report file with more information is saved as:
# /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
=== End java processes for user hadoop ===

In addition, the metrics for the TaskManager show very low Heap memory
consumption (20% of Xmx).

Hence, I suspect there is a memory leak in the TaskManager's Managed Memory.

This my TaskManager's memory detail:
flink process 112g
framework.heap.size 0.2g
task.heap.size 50g
managed.size 54g
framework.off-heap.size 0.5g
task.off-heap.size 1g
network 2g
XX:MaxMetaspaceSize 1g

As you can see, the managed memory is 54g, so it's already high (my
managed.fraction is set to 0.5).

I'm running Flink 1.10. Full job details attached.

Can someone advise what would cause a managed memory leak?

Starting YARN TaskExecutor runner (Version: 1.10.0, Rev:, 
Date:)
OS current user: yarn
Current Hadoop/Kerberos user: hadoop
JVM: OpenJDK 64-Bit Server VM - Amazon.com Inc. - 1.8/25.252-b09
Maximum heap size: 52224 MiBytes
JAVA_HOME: /etc/alternatives/jre
Hadoop version: 2.8.5-amzn-6
JVM Options:
   -Xmx54760833024
   -Xms54760833024
   -XX:MaxDirectMemorySize=3758096384
   -XX:MaxMetaspaceSize=1073741824
   -XX:+UseG1GC
   
-Dlog.file=/var/log/hadoop-yarn/containers/application_1600334141629_0011/container_1600334141629_0011_01_02/taskmanager.log
   -Dlog4j.configuration=file:./log4j.properties
Program Arguments:
   -D taskmanager.memory.framework.off-heap.size=536870912b
   -D taskmanager.memory.network.max=2147483648b
   -D taskmanager.memory.network.min=2147483648b
   -D taskmanager.memory.framework.heap.size=134217728b
   -D taskmanager.memory.managed.size=58518929408b
   -D taskmanager.cpu.cores=7.0
   -D taskmanager.memory.task.heap.size=54626615296b
   -D taskmanager.memory.task.off-heap.size=1073741824b
   --configDir .
   -Djobmanager.rpc.address=ip-***.us-west-2.compute.internal
   -Dweb.port=0
   -Dweb.tmpdir=/tmp/flink-web-ad601f25-685f-42e5-aa93-9658233031e4
   -Djobmanager.rpc.port=35435
   -Drest.address=ip-***.us-west-2.compute.internal


Re: Is it possible that late events are processed before the window?

2020-10-07 Thread Ori Popowski
Thanks

On Wed, Oct 7, 2020 at 7:06 PM Till Rohrmann  wrote:

> Hi Ori,
>
> you are right. Events are being sent down the side output for late events
> if the event's timestamp + the allowed lateness is smaller than the current
> watermark. These events are directly seen by downstream operators which
> consume the side output for late events.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski  wrote:
>
>> After creating a toy example I think that I've got the concept of
>> lateDataOutput wrong.
>>
>> It seems that the lateDataSideOutput has nothing to do with windowing;
>> when events arrive late they'll just go straight to the side output, and
>> there can never be any window firing of the main flow for that specific key.
>>
>> On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski  wrote:
>>
>>> I've made an experiment where I use an evictor on the main window (not
>>> the late one), only to write a debug file when the window fires (I don't
>>> actually evict events, I've made it so I can write a debug object the
>>> moment the window finishes).
>>>
>>> I can see that indeed the late data window fires before the main window,
>>> since the mentioned debug file does not exist, but late events _do_ exist
>>> in the destination.
>>>
>>> Writing this debug object in the evictor eliminates potential problems
>>> that might be due to logic in the process function, and it proves that the
>>> window of the late events indeed fires before the main window.
>>>
>>> Here's an outline of my job:
>>>
>>> val windowedStream = senv
>>>   .addSource(kafkaSource)
>>>   ... // some operators
>>>   // like BoundedOutOfOrdereness but ignore future timestamps
>>>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>>>   ... // some more operators
>>>   .keyingBy { case (meta, _) => meta.toPath }
>>>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
>>> window
>>>   .sideOutputLateData(lateDataTag)
>>>   .process(new ProcessSession(sessionPlayback, config))
>>> windowedStream
>>>   .map(new SerializeSession(sessionPlayback))
>>>   .addSink(sink)
>>> windowedStream
>>>   .getSideOutput(lateDataTag)
>>>   .keyingBy { case (meta, _) => meta.toPath }
>>>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
>>> window
>>>   .process(new ProcessSession(sessionPlayback, config, true))
>>>   .map(new SerializeSession(sessionPlayback, late = true))
>>>
>>> So, to repeat the question, is that normal? And if not - how can I fix
>>> this?
>>>
>>> Thanks
>>>
>>> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski  wrote:
>>>
>>>>
>>>> I have a job with event-time session window of 30 minutes.
>>>>
>>>> I output late events to side output, where I have a tumbling processing
>>>> time window of 30 minutes.
>>>>
>>>> I observe that the late events are written to storage before the "main"
>>>> events.
>>>>
>>>> I wanted to know if it's normal before digging into the code and
>>>> debugging the problem.
>>>>
>>>> Thanks
>>>>
>>>


Re: Is it possible that late events are processed before the window?

2020-10-07 Thread Ori Popowski
After creating a toy example I think that I've got the concept of
lateDataOutput wrong.

It seems that the lateDataSideOutput has nothing to do with windowing; when
events arrive late they'll just go straight to the side output, and there
can never be any window firing of the main flow for that specific key.

On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski  wrote:

> I've made an experiment where I use an evictor on the main window (not the
> late one), only to write a debug file when the window fires (I don't
> actually evict events, I've made it so I can write a debug object the
> moment the window finishes).
>
> I can see that indeed the late data window fires before the main window,
> since the mentioned debug file does not exist, but late events _do_ exist
> in the destination.
>
> Writing this debug object in the evictor eliminates potential problems
> that might be due to logic in the process function, and it proves that the
> window of the late events indeed fires before the main window.
>
> Here's an outline of my job:
>
> val windowedStream = senv
>   .addSource(kafkaSource)
>   ... // some operators
>   // like BoundedOutOfOrdereness but ignore future timestamps
>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>   ... // some more operators
>   .keyingBy { case (meta, _) => meta.toPath }
>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
> window
>   .sideOutputLateData(lateDataTag)
>   .process(new ProcessSession(sessionPlayback, config))
> windowedStream
>   .map(new SerializeSession(sessionPlayback))
>   .addSink(sink)
> windowedStream
>   .getSideOutput(lateDataTag)
>   .keyingBy { case (meta, _) => meta.toPath }
>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
> window
>   .process(new ProcessSession(sessionPlayback, config, true))
>   .map(new SerializeSession(sessionPlayback, late = true))
>
> So, to repeat the question, is that normal? And if not - how can I fix
> this?
>
> Thanks
>
> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski  wrote:
>
>>
>> I have a job with event-time session window of 30 minutes.
>>
>> I output late events to side output, where I have a tumbling processing
>> time window of 30 minutes.
>>
>> I observe that the late events are written to storage before the "main"
>> events.
>>
>> I wanted to know if it's normal before digging into the code and
>> debugging the problem.
>>
>> Thanks
>>
>


Re: Is it possible that late events are processed before the window?

2020-10-07 Thread Ori Popowski
I've made an experiment where I use an evictor on the main window (not the
late one), only to write a debug file when the window fires (I don't
actually evict events, I've made it so I can write a debug object the
moment the window finishes).

I can see that indeed the late data window fires before the main window,
since the mentioned debug file does not exist, but late events _do_ exist
in the destination.

Writing this debug object in the evictor eliminates potential problems that
might be due to logic in the process function, and it proves that the
window of the late events indeed fires before the main window.

Here's an outline of my job:

val windowedStream = senv
  .addSource(kafkaSource)
  ... // some operators
  // like BoundedOutOfOrdereness but ignore future timestamps
  .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
  ... // some more operators
  .keyingBy { case (meta, _) => meta.toPath }
  .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
window
  .sideOutputLateData(lateDataTag)
  .process(new ProcessSession(sessionPlayback, config))
windowedStream
  .map(new SerializeSession(sessionPlayback))
  .addSink(sink)
windowedStream
  .getSideOutput(lateDataTag)
  .keyingBy { case (meta, _) => meta.toPath }
  .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
window
  .process(new ProcessSession(sessionPlayback, config, true))
  .map(new SerializeSession(sessionPlayback, late = true))

So, to repeat the question, is that normal? And if not - how can I fix this?

Thanks

On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski  wrote:

>
> I have a job with event-time session window of 30 minutes.
>
> I output late events to side output, where I have a tumbling processing
> time window of 30 minutes.
>
> I observe that the late events are written to storage before the "main"
> events.
>
> I wanted to know if it's normal before digging into the code and debugging
> the problem.
>
> Thanks
>


Is it possible that late events are processed before the window?

2020-10-06 Thread Ori Popowski
I have a job with event-time session window of 30 minutes.

I output late events to side output, where I have a tumbling processing
time window of 30 minutes.

I observe that the late events are written to storage before the "main"
events.

I wanted to know if it's normal before digging into the code and debugging
the problem.

Thanks


How can I drop events which are late by more than X hours/days?

2020-09-24 Thread Ori Popowski
I need to drop elements which are delayed by more than a certain amount of
time from the current watermark.

I wanted to create a FilterFunction where I get the current watermark, and
if the difference between the watermark and my element's timestamp is
greater than X - drop the element.

However, I do not have access to the current watermark inside any of
Flink's operators/functions including FilterFunction.

How can such functionality be achieved?


Watermark advancement in late side output

2020-09-21 Thread Ori Popowski
Let's say I have an event-time stream with a window and a side output for
late data, and in the side output of the late data, I further assign
timestamps and do windowing - what is the watermark situation here?

The main stream has its own watermark advancement but the side output has
its own. Do they maintain separate watermarks? Or they intermingle?

Thanks


Re: sideOutputLateData doesn't work with map()

2020-09-17 Thread Ori Popowski
Turns out that this is the way to solve this problem:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
  (1 to 10090).foreach(ctx.collect)
  Thread.sleep(1000)
  (20 to 30).foreach(ctx.collect)
}
override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int,
TimeWindow] {
override def process(key: Int, context: Context, elements:
Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
  out.collect(elements.map(_._1).toList)
}
  })
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()
stream
  .map(list => list :+ 42)
  .print()

senv.execute()

On Thu, Sep 17, 2020 at 3:32 PM Ori Popowski  wrote:

> Hi,
>
> I have this simple flow:
>
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tag = OutputTag[Tuple1[Int]]("late")
> val stream = senv
>   .addSource(new SourceFunction[Int] {
> override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
>   (1 to 10090).foreach(ctx.collect)
>   Thread.sleep(1000)
>   (20 to 30).foreach(ctx.collect)
> }
> override def cancel(): Unit = {}
>   })
>   .map(x => Tuple1(x))
>   .assignAscendingTimestamps(_._1)
>   .keyBy(_ => 1)
>   .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
>   .sideOutputLateData(tag)
>   .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int,
> TimeWindow] {
> override def process(key: Int, context: Context, elements:
> Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
>   out.collect(elements.map(_._1).toList)
> }
>   })
> stream
>   .print()
> stream
>   .getSideOutput(tag)
>   .map(a => s"late: $a")
>   .print()
>
> senv.execute()
>
> This is a simple stream which uses a session window on integers and then
> uses process(…) to just collect them into a list. There's also side
> output for late data.
> When I run this job I can see printing to stdout of the late messages
> without any problem.
>
> However, when I add a map(…) after process(…), the late data isn't
> getting into the sideoutput and I cannot see the printing to stdout:
> …
> .sideOutputLateData(tag)
> .process(…)
> .map(list => list :+ 42)
> …
>
> Is this a bug or is it working as intended? If it's not a bug - does it
> mean I cannot add any operator after process(…)?
>
> Thanks
>


sideOutputLateData doesn't work with map()

2020-09-17 Thread Ori Popowski
Hi,

I have this simple flow:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
  (1 to 10090).foreach(ctx.collect)
  Thread.sleep(1000)
  (20 to 30).foreach(ctx.collect)
}
override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int,
TimeWindow] {
override def process(key: Int, context: Context, elements:
Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
  out.collect(elements.map(_._1).toList)
}
  })
stream
  .print()
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()

senv.execute()

This is a simple stream which uses a session window on integers and then
uses process(…) to just collect them into a list. There's also side output
for late data.
When I run this job I can see printing to stdout of the late messages
without any problem.

However, when I add a map(…) after process(…), the late data isn't getting
into the sideoutput and I cannot see the printing to stdout:
…
.sideOutputLateData(tag)
.process(…)
.map(list => list :+ 42)
…

Is this a bug or is it working as intended? If it's not a bug - does it
mean I cannot add any operator after process(…)?

Thanks


Re: How Flink distinguishes between late and in-time events?

2020-08-20 Thread Ori Popowski
That makes sense. Thanks

On Thu, Aug 20, 2020 at 7:45 PM Piotr Nowojski  wrote:

> Hi Ori,
>
> No. Flink does it differently. Operators that are keeping track of late
> events, are remembering the latest watermark. If a new element arrives with
> even time lower compared to the latest watermark, it is marked as a late
> event [1]
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html#lateness
>
> czw., 20 sie 2020 o 17:13 Ori Popowski  napisał(a):
>
>> In the documentation
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#allowed-lateness>
>> it states that:
>>
>> *[…], Flink keeps the state of windows until their allowed lateness
>> expires. Once this happens, Flink removes the window and deletes its state,
>> as also described in the Window Lifecycle
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#window-lifecycle>
>> section.*
>>
>> However, something doesn't make sense to me.
>>
>> If Flink deletes the window state, then how can it know that subsequent
>> events are late? i.e. if the state is deleted, then Flink has no way of
>> knowing than an event is late, because it can think it's just a new event,
>> unless it keeps track of which keyed windows are closed forever.
>>
>> Does Flink remember which keyed windows are closed forever?
>>
>> Thanks.
>>
>


How Flink distinguishes between late and in-time events?

2020-08-20 Thread Ori Popowski
In the documentation

it states that:

*[…], Flink keeps the state of windows until their allowed lateness
expires. Once this happens, Flink removes the window and deletes its state,
as also described in the Window Lifecycle

section.*

However, something doesn't make sense to me.

If Flink deletes the window state, then how can it know that subsequent
events are late? i.e. if the state is deleted, then Flink has no way of
knowing than an event is late, because it can think it's just a new event,
unless it keeps track of which keyed windows are closed forever.

Does Flink remember which keyed windows are closed forever?

Thanks.


Re: Key group is not in KeyGroupRange

2020-07-22 Thread Ori Popowski
The problem was caused by by concurrent access to the ValueState by another
thread. Thanks to Yun Tang
<https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yunta> for
pointing this out. Discussion is in FLINK-18637
<https://issues.apache.org/jira/browse/FLINK-18637>

On Tue, Jul 21, 2020 at 1:46 PM Ori Popowski  wrote:

> I should have mentioned, I've opened a bug for it
> https://issues.apache.org/jira/browse/FLINK-18637. So the discussion
> moved there.
>
> On Tue, Jul 14, 2020 at 2:03 PM Ori Popowski  wrote:
>
>> I'm getting this error when creating a savepoint. I've read in
>> https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by
>> unstable hashcode or equals on the key, or improper use of
>> reinterpretAsKeyedStream.
>>
>> My key is a string and I don't use reinterpretAsKeyedStream, so what's
>> going on?
>>
>> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
>> 962fc8e984e7ca1ed65a038aa62ce124 failed.
>> at
>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.CompletionException:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
>> at
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>> at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(Completab

Re: Key group is not in KeyGroupRange

2020-07-21 Thread Ori Popowski
I should have mentioned, I've opened a bug for it
https://issues.apache.org/jira/browse/FLINK-18637. So the discussion moved
there.

On Tue, Jul 14, 2020 at 2:03 PM Ori Popowski  wrote:

> I'm getting this error when creating a savepoint. I've read in
> https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by
> unstable hashcode or equals on the key, or improper use of
> reinterpretAsKeyedStream.
>
> My key is a string and I don't use reinterpretAsKeyedStream, so what's
> going on?
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 962fc8e984e7ca1ed65a038aa62ce124 failed.
> at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.ja

Key group is not in KeyGroupRange

2020-07-14 Thread Ori Popowski
I'm getting this error when creating a savepoint. I've read in
https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by
unstable hashcode or equals on the key, or improper use of
reinterpretAsKeyedStream.

My key is a string and I don't use reinterpretAsKeyedStream, so what's
going on?

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
962fc8e984e7ca1ed65a038aa62ce124 failed.
at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
at
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointException: The job has failed.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:429)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1466)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1379)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719)
at
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-13 Thread Ori Popowski
Hi,

Eventually flatMapWithState solved the problem. I started by looking into
KeyedProcessFunction which lead me to flatMapWithState. It's working very
well.

.keyBy(…)
.flatMapWithState[Event, Int] { (event, countOpt) =>
  val count = countOpt.getOrElse(0)
  if (count < config.limit) (List(event), Some(count + 1))
  else (List.empty, Some(count))
}
.keyBy(…)

Using .aggregate(…, new MyProcessFunction) while using an aggregation to
aggregate the events into a list, worked really bad and caused serious
performance issues.

Thanks!

On Sun, Jul 12, 2020 at 10:32 AM Ori Popowski  wrote:

> > AFAIK, current the 2GB limit is still there. as a workaround, maybe you
> can reduce the state size. If this can not be done using the window
> operator, can the keyedprocessfunction[1] be ok for you?
>
> I'll see if I can introduce it to the code.
>
> > if you do, the ProcessWindowFunction is getting as argument an Iterable
> with ALL elements collected along the session. This will make the state per
> key potentially huge (like you're experiencing).
>
> Thanks for noticing that. It's indeed true that we do this. The reason is
> the nature of the computation, which cannot be done incrementally
> unfortunately. It's not a classic avg(), max(), last() etc. computation
> which can be reduced in each step.
> I'm thinking of a way to cap the volume of the state per key using an
> aggregate function that limits the number of elements and returns a list of
> the collected events.
>
> class CappingAggregator(limit: Int) extends AggregateFunction[Event,
> Vector[Event], Vector[Event]] {
>   override def createAccumulator(): Vector[Event] = Vector.empty
>
>   override def add(value: Event, acc: Vector[Event]): Vector[Event] =
> if (acc.size < limit) acc :+ value
> else acc
>
>   override def getResult(acc: Vector[Event]): Vector[Event] = Vector(acc:
> _*)
>
>   override def merge(a: Vector[Event], b: Vector[Event]): Vector[Event] =
> (a ++ b).slice(0, limit)
> }
>
> My only problem is with merge(). I'm not sure if b is always later
> elements than a's or if I must sort and only then slice.
>
> On Sat, Jul 11, 2020 at 10:16 PM Rafi Aroch  wrote:
>
>> Hi Ori,
>>
>> In your code, are you using the process() API?
>>
>> .process(new MyProcessWindowFunction());
>>
>> if you do, the ProcessWindowFunction is getting as argument an Iterable
>> with ALL elements collected along the session. This will make the state per
>> key potentially huge (like you're experiencing).
>>
>> As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate()
>> API and store in state only an aggregate that is getting incrementally
>> updated on every incoming event (this could be ONE Class / Map / Tuple /
>> etc) rather than keeping ALL elements.
>>
>> See example here:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction
>>
>> Thanks,
>> Rafi
>>
>>
>> On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu 
>> wrote:
>>
>>> Hi Ori
>>>
>>> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
>>> can reduce the state size. If this can not be done using the window
>>> operator, can the keyedprocessfunction[1] be ok for you?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Ori Popowski  于2020年7月8日周三 下午8:30写道:
>>>
>>>> I've asked this question in
>>>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been
>>>> inactive for two years so I'm not sure it will be visible.
>>>>
>>>> While creating a savepoint I get a 
>>>> org.apache.flink.util.SerializedThrowable:
>>>> java.lang.NegativeArraySizeException. It's happening because some of
>>>> my windows have a keyed state of more than 2GiB, hitting RocksDB memory
>>>> limit.
>>>>
>>>> How can I prevent this?
>>>>
>>>> As I understand it, I need somehow to limit the accumulated size of the
>>>> window I'm using, which is EventTimeWindow. However, I have no way of
>>>> doing so, because the WindowOperator manages its state on its own.
>>>>
>>>> Below is a full stack trace.
>>>>
>>>> org.apache.flink.util.SerializedThrowable: Could not materialize
>

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-12 Thread Ori Popowski
> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
can reduce the state size. If this can not be done using the window
operator, can the keyedprocessfunction[1] be ok for you?

I'll see if I can introduce it to the code.

> if you do, the ProcessWindowFunction is getting as argument an Iterable
with ALL elements collected along the session. This will make the state per
key potentially huge (like you're experiencing).

Thanks for noticing that. It's indeed true that we do this. The reason is
the nature of the computation, which cannot be done incrementally
unfortunately. It's not a classic avg(), max(), last() etc. computation
which can be reduced in each step.
I'm thinking of a way to cap the volume of the state per key using an
aggregate function that limits the number of elements and returns a list of
the collected events.

class CappingAggregator(limit: Int) extends AggregateFunction[Event,
Vector[Event], Vector[Event]] {
  override def createAccumulator(): Vector[Event] = Vector.empty

  override def add(value: Event, acc: Vector[Event]): Vector[Event] =
if (acc.size < limit) acc :+ value
else acc

  override def getResult(acc: Vector[Event]): Vector[Event] = Vector(acc:
_*)

  override def merge(a: Vector[Event], b: Vector[Event]): Vector[Event] =
(a ++ b).slice(0, limit)
}

My only problem is with merge(). I'm not sure if b is always later elements
than a's or if I must sort and only then slice.

On Sat, Jul 11, 2020 at 10:16 PM Rafi Aroch  wrote:

> Hi Ori,
>
> In your code, are you using the process() API?
>
> .process(new MyProcessWindowFunction());
>
> if you do, the ProcessWindowFunction is getting as argument an Iterable
> with ALL elements collected along the session. This will make the state per
> key potentially huge (like you're experiencing).
>
> As Aljoscha Krettek suggested in the JIRA, if you can use the aggregate()
> API and store in state only an aggregate that is getting incrementally
> updated on every incoming event (this could be ONE Class / Map / Tuple /
> etc) rather than keeping ALL elements.
>
> See example here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction
>
> Thanks,
> Rafi
>
>
> On Sat, Jul 11, 2020 at 10:29 AM Congxian Qiu 
> wrote:
>
>> Hi Ori
>>
>> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
>> can reduce the state size. If this can not be done using the window
>> operator, can the keyedprocessfunction[1] be ok for you?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>
>> Best,
>> Congxian
>>
>>
>> Ori Popowski  于2020年7月8日周三 下午8:30写道:
>>
>>> I've asked this question in
>>> https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive
>>> for two years so I'm not sure it will be visible.
>>>
>>> While creating a savepoint I get a 
>>> org.apache.flink.util.SerializedThrowable:
>>> java.lang.NegativeArraySizeException. It's happening because some of my
>>> windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.
>>>
>>> How can I prevent this?
>>>
>>> As I understand it, I need somehow to limit the accumulated size of the
>>> window I'm using, which is EventTimeWindow. However, I have no way of
>>> doing so, because the WindowOperator manages its state on its own.
>>>
>>> Below is a full stack trace.
>>>
>>> org.apache.flink.util.SerializedThrowable: Could not materialize
>>> checkpoint 139 for operator Window(EventTimeSessionWindows(180),
>>> EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink:
>>> Unnamed (23/189).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.SerializedThrowable:
>>> java.lang.NegativeArraySizeException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java

Savepoint fails due to RocksDB 2GiB limit

2020-07-08 Thread Ori Popowski
I've asked this question in https://issues.apache.org/jira/browse/FLINK-9268
but it's been inactive for two years so I'm not sure it will be visible.

While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException. It's happening because some of my
windows have a keyed state of more than 2GiB, hitting RocksDB memory limit.

How can I prevent this?

As I understand it, I need somehow to limit the accumulated size of the
window I'm using, which is EventTimeWindow. However, I have no way of doing
so, because the WindowOperator manages its state on its own.

Below is a full stack trace.

org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint
139 for operator Window(EventTimeSessionWindows(180), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink: Unnamed (23/189).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
... 3 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: null
at org.rocksdb.RocksIterator.value0(Native Method)
at org.rocksdb.RocksIterator.value(RocksIterator.java:50)
at
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102)
at
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
at
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
... 5 common frames omitted


Re: Heartbeat of TaskManager timed out.

2020-07-07 Thread Ori Popowski
I wouldn't want to jump into conclusions, but from what I see, very large
lists and vectors do not work well with flatten in 2.11, each for its own
reasons.

In any case, it's 100% not a Flink issue.

On Tue, Jul 7, 2020 at 10:10 AM Xintong Song  wrote:

> Thanks for the updates, Ori.
>
> I'm not familiar with Scala. Just curious, if what you suspect is true, is
> it a bug of Scala?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jul 7, 2020 at 1:41 PM Ori Popowski  wrote:
>
>> Hi,
>>
>> I just wanted to update that the problem is now solved!
>>
>> I suspect that Scala's flatten() method has a memory problem on very
>> large lists (> 2 billion elements). When using Scala Lists, the memory
>> seems to leak but the app keeps running, and when using Scala Vectors, a
>> weird IllegalArgumentException is thrown [1].
>>
>> I implemented my own flatten() method using Arrays and quickly ran into
>> NegativeArraySizeException since the integer representing the array size
>> wrapped around at Integer.MaxValue and became negative. After I started
>> catching this exception all my cluster problems just resolved. Checkpoints,
>> the heartbeat timeout, and also the memory and CPU utilization.
>>
>> I still need to confirm my suspicion towards Scala's flatten() though,
>> since I haven't "lab-tested" it.
>>
>> [1] https://github.com/NetLogo/NetLogo/issues/1830
>>
>> On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski  wrote:
>>
>>> Hi,
>>>
>>> I initially thought this, so this is why my heap is almost 30GiB.
>>> However, I started to analyze the Java Flight Recorder files, and I
>>> suspect there's a memory leak in Scala's flatten() method.
>>> I changed the line that uses flatten(), and instead of flatten() I'm
>>> just creating a ByteArray the size flatten() would have returned, and I
>>> no longer have the heartbeat problem.
>>>
>>> So now my code is
>>> val recordingData = recordingBytes.flatten
>>>
>>> instead of
>>> val recordingData =
>>> Array.fill[Byte](recordingBytes.map(_.length).sum)(0)
>>>
>>> I attach a screenshot of Java Mission Control
>>>
>>>
>>>
>>> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song 
>>> wrote:
>>>
>>>> I agree with Roman's suggestion for increasing heap size.
>>>>
>>>> It seems that the heap grows faster than freed. Thus eventually the
>>>> Full GC is triggered, taking more than 50s and causing the timeout.
>>>> However, even the full GC frees only 2GB space out of the 28GB max size.
>>>> That probably suggests that the max heap size is not sufficient.
>>>>
>>>>> 2020-07-01T10:15:12.869+: [Full GC (Allocation Failure)
>>>>>  28944M->26018M(28960M), 51.5256128 secs]
>>>>> [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>>>> 113556K->112729K(1150976K)]
>>>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>>>
>>>>
>>>> I would not be so sure about the memory leak. I think it could be a
>>>> normal pattern that memory keeps growing as more data is processed. E.g.,
>>>> from the provided log, I see window operation tasks executed in the task
>>>> manager. Such operation might accumulate data until the window is emitted.
>>>>
>>>> Maybe Ori you can also take a look at the task manager log when the job
>>>> runs with Flink 1.9 without this problem, see how the heap size changed. As
>>>> I mentioned before, it is possible that, with the same configurations Flink
>>>> 1.10 has less heap size compared to Flink 1.9, due to the memory model
>>>> changes.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski  wrote:
>>>>
>>>>> Thank you very much for your analysis.
>>>>>
>>>>> When I said there was no memory leak - I meant that from the specific
>>>>> TaskManager I monitored in real-time using JProfiler.
>>>>> Unfortunately, this problem occurs only in 1 of the TaskManager and
>>>>> you cannot anticipate which. So when you pick a TM to profile at random -
>>>>> everything looks fine.
>>>&

Re: Heartbeat of TaskManager timed out.

2020-07-06 Thread Ori Popowski
Hi,

I just wanted to update that the problem is now solved!

I suspect that Scala's flatten() method has a memory problem on very large
lists (> 2 billion elements). When using Scala Lists, the memory seems to
leak but the app keeps running, and when using Scala Vectors, a weird
IllegalArgumentException is thrown [1].

I implemented my own flatten() method using Arrays and quickly ran into
NegativeArraySizeException since the integer representing the array size
wrapped around at Integer.MaxValue and became negative. After I started
catching this exception all my cluster problems just resolved. Checkpoints,
the heartbeat timeout, and also the memory and CPU utilization.

I still need to confirm my suspicion towards Scala's flatten() though,
since I haven't "lab-tested" it.

[1] https://github.com/NetLogo/NetLogo/issues/1830

On Sun, Jul 5, 2020 at 2:21 PM Ori Popowski  wrote:

> Hi,
>
> I initially thought this, so this is why my heap is almost 30GiB.
> However, I started to analyze the Java Flight Recorder files, and I
> suspect there's a memory leak in Scala's flatten() method.
> I changed the line that uses flatten(), and instead of flatten() I'm just
> creating a ByteArray the size flatten() would have returned, and I no
> longer have the heartbeat problem.
>
> So now my code is
> val recordingData = recordingBytes.flatten
>
> instead of
> val recordingData =
> Array.fill[Byte](recordingBytes.map(_.length).sum)(0)
>
> I attach a screenshot of Java Mission Control
>
>
>
> On Fri, Jul 3, 2020 at 7:24 AM Xintong Song  wrote:
>
>> I agree with Roman's suggestion for increasing heap size.
>>
>> It seems that the heap grows faster than freed. Thus eventually the Full
>> GC is triggered, taking more than 50s and causing the timeout. However,
>> even the full GC frees only 2GB space out of the 28GB max size. That
>> probably suggests that the max heap size is not sufficient.
>>
>>> 2020-07-01T10:15:12.869+: [Full GC (Allocation Failure)
>>>  28944M->26018M(28960M), 51.5256128 secs]
>>> [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
>>> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
>>> 113556K->112729K(1150976K)]
>>>   [Times: user=91.08 sys=0.06, real=51.53 secs]
>>
>>
>> I would not be so sure about the memory leak. I think it could be a
>> normal pattern that memory keeps growing as more data is processed. E.g.,
>> from the provided log, I see window operation tasks executed in the task
>> manager. Such operation might accumulate data until the window is emitted.
>>
>> Maybe Ori you can also take a look at the task manager log when the job
>> runs with Flink 1.9 without this problem, see how the heap size changed. As
>> I mentioned before, it is possible that, with the same configurations Flink
>> 1.10 has less heap size compared to Flink 1.9, due to the memory model
>> changes.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, Jul 2, 2020 at 8:58 PM Ori Popowski  wrote:
>>
>>> Thank you very much for your analysis.
>>>
>>> When I said there was no memory leak - I meant that from the specific
>>> TaskManager I monitored in real-time using JProfiler.
>>> Unfortunately, this problem occurs only in 1 of the TaskManager and you
>>> cannot anticipate which. So when you pick a TM to profile at random -
>>> everything looks fine.
>>>
>>> I'm running the job again with Java FlightRecorder now, and I hope I'll
>>> find the reason for the memory leak.
>>>
>>> Thanks!
>>>
>>> On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
>>>> Thanks, Ori
>>>>
>>>> From the log, it looks like there IS a memory leak.
>>>>
>>>> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
>>>> 0.4653809 secs:
>>>> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
>>>> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>>>>
>>>> Then the heap grew from 10G to 28G with GC not being able to free up
>>>> enough space:
>>>> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
>>>> 12591.0M(28960.0M)->11247.0M(28960.0M)]
>>>> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
>>>> 12103.0M(28960.0M)->11655.0M(28960.0M)]
>>>> [Eden: 1264.0M(1264.0M)->0.0B(1

Re: Heartbeat of TaskManager timed out.

2020-07-02 Thread Ori Popowski
Thank you very much for your analysis.

When I said there was no memory leak - I meant that from the specific
TaskManager I monitored in real-time using JProfiler.
Unfortunately, this problem occurs only in 1 of the TaskManager and you
cannot anticipate which. So when you pick a TM to profile at random -
everything looks fine.

I'm running the job again with Java FlightRecorder now, and I hope I'll
find the reason for the memory leak.

Thanks!

On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Thanks, Ori
>
> From the log, it looks like there IS a memory leak.
>
> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
> 0.4653809 secs:
> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>
> Then the heap grew from 10G to 28G with GC not being able to free up
> enough space:
> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
> 12591.0M(28960.0M)->11247.0M(28960.0M)]
> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
> 12103.0M(28960.0M)->11655.0M(28960.0M)]
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 12929.0M(28960.0M)->12467.0M(28960.0M)]
> ... ...
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 28042.6M(28960.0M)->27220.6M(28960.0M)]
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 28494.5M(28960.0M)->28720.6M(28960.0M)]
> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>
> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
> heartbeat timed out:
> 2020-07-01T10:15:12.869+: [Full GC (Allocation Failure)
>  28944M->26018M(28960M), 51.5256128 secs]
>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
> 113556K->112729K(1150976K)]
>   [Times: user=91.08 sys=0.06, real=51.53 secs]
> 2020-07-01T10:16:04.395+: [GC concurrent-mark-abort]
> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
> JobManager with id bc59ba6a
>
> No substantial amount memory was freed after that.
>
> If this memory usage pattern is expected, I'd suggest to:
> 1. increase heap size
> 2. play with PrintStringDeduplicationStatistics and UseStringDeduplication
> flags - probably string deduplication is making G1 slower then CMS
>
> Regards,
> Roman
>
>
> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski  wrote:
>
>> Hi,
>>
>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>
>>
>> Thanks!
>>
>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song 
>> wrote:
>>
>>> Maybe you can share the log and gc-log of the problematic TaskManager?
>>> See if we can find any clue.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski  wrote:
>>>
>>>> I've found out that sometimes one of my TaskManagers experiences a GC
>>>> pause of 40-50 seconds and I have no idea why.
>>>> I profiled one of the machines using JProfiler and everything looks
>>>> fine. No memory leaks, memory is low.
>>>> However, I cannot anticipate which of the machines will get the 40-50
>>>> seconds pause and I also cannot profile all of them all the time.
>>>>
>>>> Any suggestions?
>>>>
>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song 
>>>> wrote:
>>>>
>>>>> In Flink 1.10, there's a huge change in the memory management compared
>>>>> to previous versions. This could be related to your observations, because
>>>>> with the same configurations, it is possible that there's less JVM heap
>>>>> space (with more off-heap memory). Please take a look at this migration
>>>>> guide [1].
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>
>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski 
>>>>> wrote:
>>>>>
>>>>>> Thanks for the suggestions!
>>>>>>
>>>>>> > i recen

Re: Heartbeat of TaskManager timed out.

2020-07-01 Thread Ori Popowski
I've found out that sometimes one of my TaskManagers experiences a GC pause
of 40-50 seconds and I have no idea why.
I profiled one of the machines using JProfiler and everything looks fine.
No memory leaks, memory is low.
However, I cannot anticipate which of the machines will get the 40-50
seconds pause and I also cannot profile all of them all the time.

Any suggestions?

On Mon, Jun 29, 2020 at 4:44 AM Xintong Song  wrote:

> In Flink 1.10, there's a huge change in the memory management compared to
> previous versions. This could be related to your observations, because with
> the same configurations, it is possible that there's less JVM heap space
> (with more off-heap memory). Please take a look at this migration guide [1].
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>
> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski  wrote:
>
>> Thanks for the suggestions!
>>
>> > i recently tried 1.10 and see this error frequently. and i dont have
>> the same issue when running with 1.9.1
>> I did downgrade to Flink 1.9 and there's certainly no change in the
>> occurrences in the heartbeat timeout
>>
>>
>> >
>>
>>- Probably the most straightforward way is to try increasing the
>>timeout to see if that helps. You can leverage the configuration option
>>`heartbeat.timeout`[1]. The default is 50s.
>>- It might be helpful to share your configuration setups (e.g., the
>>TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>share the beginning part of your JM/TM logs, including the JVM parameters
>>and all the loaded configurations.
>>- You may want to look into the GC logs in addition to the metrics.
>>In case of a CMS GC stop-the-world, you may not be able to see the most
>>recent metrics due to the process not responding to the metric querying
>>services.
>>- You may also look into the status of the JM process. If JM is under
>>significant GC pressure, it could also happen that the heartbeat message
>>from TM is not timely handled before the timeout check.
>>- Is there any metrics monitoring the network condition between the
>>JM and timeouted TM? Possibly any jitters?
>>
>>
>> Weirdly enough, I did manage to find a problem with the timed out
>> TaskManagers, which slipped away the last time I checked: The timed out
>> TaskManager is always the one with the max. GC time (young generation). I
>> see it only now that I run with G1GC, but with the previous GC it wasn't
>> the case.
>>
>> Does anyone know what can cause high GC time and how to mitigate this?
>>
>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song 
>> wrote:
>>
>>> Hi Ori,
>>>
>>> Here are some suggestions from my side.
>>>
>>>- Probably the most straightforward way is to try increasing the
>>>timeout to see if that helps. You can leverage the configuration option
>>>`heartbeat.timeout`[1]. The default is 50s.
>>>- It might be helpful to share your configuration setups (e.g., the
>>>TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>>share the beginning part of your JM/TM logs, including the JVM parameters
>>>and all the loaded configurations.
>>>- You may want to look into the GC logs in addition to the metrics.
>>>In case of a CMS GC stop-the-world, you may not be able to see the most
>>>recent metrics due to the process not responding to the metric querying
>>>services.
>>>- You may also look into the status of the JM process. If JM is
>>>under significant GC pressure, it could also happen that the heartbeat
>>>message from TM is not timely handled before the timeout check.
>>>- Is there any metrics monitoring the network condition between the
>>>JM and timeouted TM? Possibly any jitters?
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>
>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski  wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>> partitions and I have parallelism of 189.
>>>>
>>>> Currently running with RocksDB, with checkpointing disabled. My state
>>>> si

Re: Timeout when using RockDB to handle large state in a stream app

2020-06-29 Thread Ori Popowski
Hi there,

I'm currently experiencing the exact same issue.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html

I've found out that GC is causing the problem, but I still haven't managed
to solve this.



On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I am trying to run a stream application with large state in a
> standalone flink cluster [3]. I configured the RocksDB state backend
> and I increased the memory of the Job Manager and Task Manager.
> However, I am still getting the timeout message
> "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> 1.10.1 and here are the configurations that I changed on the
> flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> filesystem. I am not sure if I need to use HDFS here since I am
> testing only in one machine.
>
> jobmanager.heap.size: 12g
> taskmanager.memory.process.size: 8g
> state.backend: rocksdb
> state.checkpoints.dir: file:///tmp/flink/state
>
> In the stream application I am using RocksDB as well (full code [3]):
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state",
> true));
>
> I have some operators that hold a large state when the load a static
> table on their state. I use them in two aggregate operations [1] and
> [2].
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
>
> Here is my stack trace error:
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> at
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> at
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> at
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:

Re: Heartbeat of TaskManager timed out.

2020-06-28 Thread Ori Popowski
Thanks for the suggestions!

> i recently tried 1.10 and see this error frequently. and i dont have the
same issue when running with 1.9.1
I did downgrade to Flink 1.9 and there's certainly no change in the
occurrences in the heartbeat timeout


>

   - Probably the most straightforward way is to try increasing the timeout
   to see if that helps. You can leverage the configuration option
   `heartbeat.timeout`[1]. The default is 50s.
   - It might be helpful to share your configuration setups (e.g., the TM
   resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
   share the beginning part of your JM/TM logs, including the JVM parameters
   and all the loaded configurations.
   - You may want to look into the GC logs in addition to the metrics. In
   case of a CMS GC stop-the-world, you may not be able to see the most recent
   metrics due to the process not responding to the metric querying services.
   - You may also look into the status of the JM process. If JM is under
   significant GC pressure, it could also happen that the heartbeat message
   from TM is not timely handled before the timeout check.
   - Is there any metrics monitoring the network condition between the JM
   and timeouted TM? Possibly any jitters?


Weirdly enough, I did manage to find a problem with the timed out
TaskManagers, which slipped away the last time I checked: The timed out
TaskManager is always the one with the max. GC time (young generation). I
see it only now that I run with G1GC, but with the previous GC it wasn't
the case.

Does anyone know what can cause high GC time and how to mitigate this?

On Sun, Jun 28, 2020 at 5:04 AM Xintong Song  wrote:

> Hi Ori,
>
> Here are some suggestions from my side.
>
>- Probably the most straightforward way is to try increasing the
>timeout to see if that helps. You can leverage the configuration option
>`heartbeat.timeout`[1]. The default is 50s.
>- It might be helpful to share your configuration setups (e.g., the TM
>resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>share the beginning part of your JM/TM logs, including the JVM parameters
>and all the loaded configurations.
>- You may want to look into the GC logs in addition to the metrics. In
>case of a CMS GC stop-the-world, you may not be able to see the most recent
>metrics due to the process not responding to the metric querying services.
>- You may also look into the status of the JM process. If JM is under
>significant GC pressure, it could also happen that the heartbeat message
>from TM is not timely handled before the timeout check.
>- Is there any metrics monitoring the network condition between the JM
>and timeouted TM? Possibly any jitters?
>
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>
> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski  wrote:
>
>> Hello,
>>
>> I'm running Flink 1.10 on EMR and reading from Kafka with 189 partitions
>> and I have parallelism of 189.
>>
>> Currently running with RocksDB, with checkpointing disabled. My state
>> size is appx. 500gb.
>>
>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with no
>> apparent reason.
>>
>> I check the container that gets the timeout for GC pauses, heap memory,
>> direct memory, mapped memory, offheap memory, CPU load, network load, total
>> out-records, total in-records, backpressure, and everything I can think of.
>> But all those metrics show that there's nothing unusual, and it has around
>> average values for all those metrics. There are a lot of other containers
>> which score higher.
>>
>> All the metrics are very low because every TaskManager runs on a
>> r5.2xlarge machine alone.
>>
>> I'm trying to debug this for days and I cannot find any explanation for
>> it.
>>
>> Can someone explain why it's happening?
>>
>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>> container_1593074931633_0011_01_000127 timed out.
>> at org.apache.flink.runtime.jobmaster.
>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
>> .java:1147)
>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>> HeartbeatMonitorImpl.java:109)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>> .java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>> AkkaRpcActor.java:397)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpc

Heartbeat of TaskManager timed out.

2020-06-25 Thread Ori Popowski
Hello,

I'm running Flink 1.10 on EMR and reading from Kafka with 189 partitions
and I have parallelism of 189.

Currently running with RocksDB, with checkpointing disabled. My state size
is appx. 500gb.

I'm getting sporadic "Heartbeat of TaskManager timed out" errors with no
apparent reason.

I check the container that gets the timeout for GC pauses, heap memory,
direct memory, mapped memory, offheap memory, CPU load, network load, total
out-records, total in-records, backpressure, and everything I can think of.
But all those metrics show that there's nothing unusual, and it has around
average values for all those metrics. There are a lot of other containers
which score higher.

All the metrics are very low because every TaskManager runs on a r5.2xlarge
machine alone.

I'm trying to debug this for days and I cannot find any explanation for it.

Can someone explain why it's happening?

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
container_1593074931633_0011_01_000127 timed out.
at org.apache.flink.runtime.jobmaster.
JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
.java:1147)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
AkkaRpcActor.java:397)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
AkkaRpcActor.java:190)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107)

Thanks


Re: State leak

2020-06-24 Thread Ori Popowski
Hi, thanks for answering.

Sounds reasonable

On Wed, Jun 24, 2020 at 11:50 AM Rafi Aroch  wrote:

> Hi Ori,
>
> Once a session ends, it's state should get purged. You should take care
> that a session does end.
> For example, if you wait for a 'session-end' event, limit it with some
> time threshold. If it's defined with inactivity gap and your client sends
> infinite events, you could limit the session length to enforce a new
> session (may be simpler done on the client side).
>
> Hope this helps,
> Rafi
>
>
> On Tue, Jun 23, 2020 at 5:11 PM Ori Popowski  wrote:
>
>> Hi,
>>
>> When working with an ever growing key-space (let's say session ID), and a
>> SessionWindow with a ProcessFunction - should we worry about the state
>> growing indefinitely? Or does the window make sure to clean state after
>> triggers?
>>
>> Thanks
>>
>


State leak

2020-06-23 Thread Ori Popowski
Hi,

When working with an ever growing key-space (let's say session ID), and a
SessionWindow with a ProcessFunction - should we worry about the state
growing indefinitely? Or does the window make sure to clean state after
triggers?

Thanks


Re: [EXTERNAL] Re: Renaming the metrics

2020-06-23 Thread Ori Popowski
Thanks for the suggestion.

After digging a bit, we've found it most convenient to just add labels to
all our Prometheus queries, like this:

flink_taskmanager_job_task_operator_currentOutputWatermark{job_name=""}

The job_name label will be exposed if you run your job with a job name like
this:

senv.execute("")

On Mon, Jun 22, 2020 at 8:01 PM Slotterback, Chris <
chris_slotterb...@comcast.com> wrote:

> Hi Ori,
>
>
>
> Another more temporary brute-force option, while not officially flink,
> could be building a modified version of the metrics plugin into flink where
> you manually manipulate the prefixes yourself. It’s actually pretty easy to
> build the jar, and to test it you drop the jar into the plugin path. I’ve
> done something similar where I actually filter our a lot of the prefixes
> that I don’t want, because too many metric points were being generated from
> some custom metrics. The config for the filter is loaded from the flink
> conf, you could possibly implement something similar where you pass the job
> name in each clusters config:
>
>
>
>
> https://github.com/cslotterback/flink/commit/fd8e1f77a83a3ae1253da53596d22471bb6fe902
>
> and
>
>
> https://github.com/cslotterback/flink/commit/ce3797ea46f3321885c4352ecc36b9385b7ca0ce
>
>
>
> This isn’t what I’d call ideal, but it gets the job done. I would love a
> generic flink-approved method of configuring Prometheus metrics.
>
>
>
> Chris
>
>
>
>
>
> *From: *Ori Popowski 
> *Date: *Monday, June 22, 2020 at 12:22 PM
> *Cc: *user 
> *Subject: *[EXTERNAL] Re: Renaming the metrics
>
>
>
> Thanks for answering.
>
>
>
> Unrelated to Flink, but if anyone knows a way to rename the metrics inside
> Prometheus I'd appreciate if you can share.
>
>
>
> About the push gateway - I think I'll stick with the pull options because
> it looks like a better fit to the use case
>
>
>
> On Mon, Jun 22, 2020 at 4:47 PM Chesnay Schepler 
> wrote:
>
> There's currently no way to change this.
>
>
>
> A related enhancement was proposed on FLINK-17495 that would at least
> allow you to attach a custom label, but the initial implementation wasn't
> general enough.
>
>
>
> On 22/06/2020 15:08, Arvid Heise wrote:
>
> Hi Ori,
>
>
>
> I see that the PrometheusPushGatewayReporter [1] has an option for a job
> name, maybe you can use that.
>
>
>
> I'm also including Chesnay who probably has more ideas.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter
> <https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html*prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter__;Iw!!CQl3mcHX2A!TVuthJHVtx7kmK6h8pk4bAuSzLu0Gk9Tmxm6JKPVsoOxtzG44_wJ9viTv4VFM2PNL143GF4$>
>
>
>
> On Mon, Jun 22, 2020 at 9:01 AM Ori Popowski  wrote:
>
> I have two Flink clusters sending metrics via Prometheus and they share
> all the metric names (i.e.
> flink_taskmanager_job_task_operator_currentOutputWatermark).
>
>
>
> I want to change the flink_ prefix to something else to distinguish
> between the clusters (maybe the job-name).
>
>
>
> How can I do it?
>
>
>
> Thanks.
>
>
>
> --
>
> *Arvid Heise *| Senior Java Developer
>
>
> <https://urldefense.com/v3/__https:/www.ververica.com/__;!!CQl3mcHX2A!TVuthJHVtx7kmK6h8pk4bAuSzLu0Gk9Tmxm6JKPVsoOxtzG44_wJ9viTv4VFM2PNOG4n0F8$>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward
> <https://urldefense.com/v3/__https:/flink-forward.org/__;!!CQl3mcHX2A!TVuthJHVtx7kmK6h8pk4bAuSzLu0Gk9Tmxm6JKPVsoOxtzG44_wJ9viTv4VFM2PNm4cAkLw$>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>
>
>


Re: Renaming the metrics

2020-06-22 Thread Ori Popowski
Thanks for answering.

Unrelated to Flink, but if anyone knows a way to rename the metrics inside
Prometheus I'd appreciate if you can share.

About the push gateway - I think I'll stick with the pull options because
it looks like a better fit to the use case

On Mon, Jun 22, 2020 at 4:47 PM Chesnay Schepler  wrote:

> There's currently no way to change this.
>
> A related enhancement was proposed on FLINK-17495 that would at least
> allow you to attach a custom label, but the initial implementation wasn't
> general enough.
>
> On 22/06/2020 15:08, Arvid Heise wrote:
>
> Hi Ori,
>
> I see that the PrometheusPushGatewayReporter [1] has an option for a job
> name, maybe you can use that.
>
> I'm also including Chesnay who probably has more ideas.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter
>
> On Mon, Jun 22, 2020 at 9:01 AM Ori Popowski  wrote:
>
>> I have two Flink clusters sending metrics via Prometheus and they share
>> all the metric names (i.e.
>> flink_taskmanager_job_task_operator_currentOutputWatermark).
>>
>> I want to change the flink_ prefix to something else to distinguish
>> between the clusters (maybe the job-name).
>>
>> How can I do it?
>>
>> Thanks.
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing
> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>
>
>
>


Renaming the metrics

2020-06-22 Thread Ori Popowski
I have two Flink clusters sending metrics via Prometheus and they share all
the metric names (i.e.
flink_taskmanager_job_task_operator_currentOutputWatermark).

I want to change the flink_ prefix to something else to distinguish between
the clusters (maybe the job-name).

How can I do it?

Thanks.


Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Ori Popowski
Hi @aljoscha

The watermark metrics look fine. (attached screenshot)
[image: image.png]

This is the extractor:
class TimestampExtractor[A, B <: AbstractEvent] extends
BoundedOutOfOrdernessTimestampExtractor[(A, B)](Time.minutes(5)) {
  override def extractTimestamp(element: (A, B)): Long =
Instant.now.toEpochMilli.min(element._2.sequence / 1000)
}

I'll try to output the watermark and report my findings

On Tue, Jun 16, 2020 at 3:21 PM Aljoscha Krettek 
wrote:

> Did you look at the watermark metrics? Do you know what the current
> watermark is when the windows are firing. You could also get the current
> watemark when using a ProcessWindowFunction and also emit that in the
> records that you're printing, for debugging.
>
> What is that TimestampAssigner you're using for your timestamp
> assigner/watermark extractor?
>
> Best,
> Aljoscha
>
> On 16.06.20 14:10, Ori Popowski wrote:
> > Okay, so I created a simple stream (similar to the original stream),
> where
> > I just write the timestamps of each evaluated window to S3.
> > The session gap is 30 minutes, and this is one of the sessions:
> > (first-event, last-event, num-events)
> >
> > 11:23-11:23 11 events
> > 11:25-11:26 51 events
> > 11:28-11:29 74 events
> > 11:31-11:31 13 events
> >
> > Again, this is one session. How can we explain this? Why does Flink
> create
> > 4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
> > some help.
> >
> > On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski  wrote:
> >
> >> Hi, thanks for answering.
> >>
> >>> I guess you consume from Kafka from the earliest offset, so you consume
> >> historical data and Flink is catching-up.
> >> Yes, it's what's happening. But Kafka is partitioned on sessionId, so
> skew
> >> between partitions cannot explain it.
> >> I think the only way it can happen is when when suddenly there's one
> event
> >> with very late timestamp
> >>
> >>> Just to verify, if you do keyBy sessionId, do you check the gaps of
> >> events from the same session?
> >> Good point. sessionId is unique in this case, and even if it's not -
> every
> >> single session suffers from this problem of early triggering so it's
> very
> >> unlikely that all millions sessions within that hour had duplicates.
> >>
> >> I'm suspecting that the fact I have two ProcessWindowFunctions one after
> >> the other somehow causes this.
> >> I deployed a version with one window function which just prints the
> >> timestamps to S3 (to find out if I have event-time jumps) and suddenly
> it
> >> doesn't trigger early (I'm running for 10 minutes and not a single event
> >> has arrived to the sink)
> >>
> >> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch 
> wrote:
> >>
> >>> Hi Ori,
> >>>
> >>> I guess you consume from Kafka from the earliest offset, so you consume
> >>> historical data and Flink is catching-up.
> >>>
> >>> Regarding: *My event-time timestamps also do not have big gaps*
> >>>
> >>> Just to verify, if you do keyBy sessionId, do you check the gaps of
> >>> events from the same session?
> >>>
> >>> Rafi
> >>>
> >>>
> >>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski 
> wrote:
> >>>
> >>>> So why is it happening? I have no clue at the moment.
> >>>> My event-time timestamps also do not have big gaps between them that
> >>>> would explain the window triggering.
> >>>>
> >>>>
> >>>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
> >>>> wrote:
> >>>>
> >>>>> If you are using event time in Flink, it is disconnected from the
> real
> >>>>> world wall clock time.
> >>>>> You can process historical data in a streaming program as if it was
> >>>>> real-time data (potentially reading through (event time) years of
> data in a
> >>>>> few (wall clock) minutes)
> >>>>>
> >>>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com>
> wrote:
> >>>>>
> >>>>>> Hi
> >>>>>>
> >>>>>> I think it maybe you use the event time, and the timestamp between
> >>>>>> your event data is bigger than 30minutes, maybe you can check the
> source
> >>>>>> data timestamp.
> >>>>>>
> >>>>>> Best,
> >>>>>> Yichao Yang
> >>>>>>
> >>>>>> --
> >>>>>> 发自我的iPhone
> >>>>>>
> >>>>>>
> >>>>>> -- Original --
> >>>>>> *From:* Ori Popowski 
> >>>>>> *Date:* Mon,Jun 15,2020 10:50 PM
> >>>>>> *To:* user 
> >>>>>> *Subject:* Re: EventTimeSessionWindow firing too soon
> >>>>>>
> >>>>>>
> >
>
>


Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Ori Popowski
Okay, so I created a simple stream (similar to the original stream), where
I just write the timestamps of each evaluated window to S3.
The session gap is 30 minutes, and this is one of the sessions:
(first-event, last-event, num-events)

11:23-11:23 11 events
11:25-11:26 51 events
11:28-11:29 74 events
11:31-11:31 13 events

Again, this is one session. How can we explain this? Why does Flink create
4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
some help.

On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski  wrote:

> Hi, thanks for answering.
>
> > I guess you consume from Kafka from the earliest offset, so you consume
> historical data and Flink is catching-up.
> Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
> between partitions cannot explain it.
> I think the only way it can happen is when when suddenly there's one event
> with very late timestamp
>
> > Just to verify, if you do keyBy sessionId, do you check the gaps of
> events from the same session?
> Good point. sessionId is unique in this case, and even if it's not - every
> single session suffers from this problem of early triggering so it's very
> unlikely that all millions sessions within that hour had duplicates.
>
> I'm suspecting that the fact I have two ProcessWindowFunctions one after
> the other somehow causes this.
> I deployed a version with one window function which just prints the
> timestamps to S3 (to find out if I have event-time jumps) and suddenly it
> doesn't trigger early (I'm running for 10 minutes and not a single event
> has arrived to the sink)
>
> On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch  wrote:
>
>> Hi Ori,
>>
>> I guess you consume from Kafka from the earliest offset, so you consume
>> historical data and Flink is catching-up.
>>
>> Regarding: *My event-time timestamps also do not have big gaps*
>>
>> Just to verify, if you do keyBy sessionId, do you check the gaps of
>> events from the same session?
>>
>> Rafi
>>
>>
>> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski  wrote:
>>
>>> So why is it happening? I have no clue at the moment.
>>> My event-time timestamps also do not have big gaps between them that
>>> would explain the window triggering.
>>>
>>>
>>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
>>> wrote:
>>>
>>>> If you are using event time in Flink, it is disconnected from the real
>>>> world wall clock time.
>>>> You can process historical data in a streaming program as if it was
>>>> real-time data (potentially reading through (event time) years of data in a
>>>> few (wall clock) minutes)
>>>>
>>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I think it maybe you use the event time, and the timestamp between
>>>>> your event data is bigger than 30minutes, maybe you can check the source
>>>>> data timestamp.
>>>>>
>>>>> Best,
>>>>> Yichao Yang
>>>>>
>>>>> --
>>>>> 发自我的iPhone
>>>>>
>>>>>
>>>>> -- Original --
>>>>> *From:* Ori Popowski 
>>>>> *Date:* Mon,Jun 15,2020 10:50 PM
>>>>> *To:* user 
>>>>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>>>>
>>>>>


Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Ori Popowski
 Hi, thanks for answering.

> I guess you consume from Kafka from the earliest offset, so you consume
historical data and Flink is catching-up.
Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
between partitions cannot explain it.
I think the only way it can happen is when when suddenly there's one event
with very late timestamp

> Just to verify, if you do keyBy sessionId, do you check the gaps of
events from the same session?
Good point. sessionId is unique in this case, and even if it's not - every
single session suffers from this problem of early triggering so it's very
unlikely that all millions sessions within that hour had duplicates.

I'm suspecting that the fact I have two ProcessWindowFunctions one after
the other somehow causes this.
I deployed a version with one window function which just prints the
timestamps to S3 (to find out if I have event-time jumps) and suddenly it
doesn't trigger early (I'm running for 10 minutes and not a single event
has arrived to the sink)

On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch  wrote:

> Hi Ori,
>
> I guess you consume from Kafka from the earliest offset, so you consume
> historical data and Flink is catching-up.
>
> Regarding: *My event-time timestamps also do not have big gaps*
>
> Just to verify, if you do keyBy sessionId, do you check the gaps of
> events from the same session?
>
> Rafi
>
>
> On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski  wrote:
>
>> So why is it happening? I have no clue at the moment.
>> My event-time timestamps also do not have big gaps between them that
>> would explain the window triggering.
>>
>>
>> On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
>> wrote:
>>
>>> If you are using event time in Flink, it is disconnected from the real
>>> world wall clock time.
>>> You can process historical data in a streaming program as if it was
>>> real-time data (potentially reading through (event time) years of data in a
>>> few (wall clock) minutes)
>>>
>>> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I think it maybe you use the event time, and the timestamp between your
>>>> event data is bigger than 30minutes, maybe you can check the source data
>>>> timestamp.
>>>>
>>>> Best,
>>>> Yichao Yang
>>>>
>>>> --
>>>> 发自我的iPhone
>>>>
>>>>
>>>> -- Original --
>>>> *From:* Ori Popowski 
>>>> *Date:* Mon,Jun 15,2020 10:50 PM
>>>> *To:* user 
>>>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>>>
>>>>


Re: EventTimeSessionWindow firing too soon

2020-06-15 Thread Ori Popowski
So why is it happening? I have no clue at the moment.
My event-time timestamps also do not have big gaps between them that would
explain the window triggering.


On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger  wrote:

> If you are using event time in Flink, it is disconnected from the real
> world wall clock time.
> You can process historical data in a streaming program as if it was
> real-time data (potentially reading through (event time) years of data in a
> few (wall clock) minutes)
>
> On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com> wrote:
>
>> Hi
>>
>> I think it maybe you use the event time, and the timestamp between your
>> event data is bigger than 30minutes, maybe you can check the source data
>> timestamp.
>>
>> Best,
>> Yichao Yang
>>
>> ------
>> 发自我的iPhone
>>
>>
>> -- Original --
>> *From:* Ori Popowski 
>> *Date:* Mon,Jun 15,2020 10:50 PM
>> *To:* user 
>> *Subject:* Re: EventTimeSessionWindow firing too soon
>>
>>


EventTimeSessionWindow firing too soon

2020-06-15 Thread Ori Popowski
I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a
gap of 30 minutes.

But as soon as I start the job, events are written to the sink (I can see
them in S3) even though 30 minutes have not passed.

This is my job:

val stream = senv
  .addSource(new FlinkKafkaConsumer("…", compressedEventDeserializer,
properties))
  .filter(_.sessionId.nonEmpty)
  .flatMap(_ match { case (_, events) => events })
  .assignTimestampsAndWatermarks(new
TimestampExtractor[Event](Time.minutes(10)) {
override def extractTimestamp(element: Event): Long =
event.sequence / 1000 // microseconds
  })
  .keyBy(_.sessionId)
  .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
  .process(myProcessWindowFunction)

AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)

Any idea why it's happening?


Re: Support for Flink in EMR 6.0

2020-05-04 Thread Ori Popowski
Thanks.

I also contacted AWS support and they should keep me updated about it.

Hoping to see this soon :)

On Mon, May 4, 2020 at 4:41 PM Robert Metzger  wrote:

> Hey Ori,
>
> thanks a lot for reaching out to the user@ mailing list. This is more a
> question for the EMR Team at Amazon than the Flink community. But it seems
> that the lack of Hadoop 3 support in Flink [1] seems to be the reason why
> EMR 6.0 doesn't have Flink support.
> Flink 1.11 will most likely have support for Hadoop 3, and I hope that
> Amazon will add Flink back into EMR.
>
> However, if you have a EMR 6.0.0 with Hadoop 3 deployed, you should be
> able deploy vanilla Flink 1.10.0 there as well!
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-11086?focusedCommentId=17088936&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17088936
>
> On Mon, May 4, 2020 at 2:55 PM Ori Popowski  wrote:
>
>> Hi,
>>
>> EMR 6.0.0 has been released [1], and this release ignores Apache Flink
>> (as well as other applications).
>>
>> Are there any plans to add support for Apache Flink for EMR 6.0.0 in the
>> future?
>>
>> Thanks.
>>
>> [1]
>> https://aws.amazon.com/about-aws/whats-new/2020/04/amazon-emr-announces-emr-release-6-with-new-major-versions-hadoop-hive-hbase-amazon-linux-2-docker/
>>
>


Support for Flink in EMR 6.0

2020-05-04 Thread Ori Popowski
Hi,

EMR 6.0.0 has been released [1], and this release ignores Apache Flink (as
well as other applications).

Are there any plans to add support for Apache Flink for EMR 6.0.0 in the
future?

Thanks.

[1]
https://aws.amazon.com/about-aws/whats-new/2020/04/amazon-emr-announces-emr-release-6-with-new-major-versions-hadoop-hive-hbase-amazon-linux-2-docker/