Re: hourly counter

2020-09-22 Thread Timo Walther

Hi Lian,

you are right that timers are not available in a ProcessWindowFunction 
but the state store can be accessed. So given that your window width is 
1 min, you could maintain an additional state value for counting the 
minutes and updating your counter once this value reached 60.


Otherwise, I would recommend to use a process function and implement the 
windowing logic yourself if it is a simple windowing operation.


Btw if you are using a Long counter, I would say that chances are low 
that it will overflow. Also, have you considered using Flink's metric 
system? it might make custom metric clients unnecessary.


I hope this helps.

Regards,
Timo


On 22.09.20 02:02, Lian Jiang wrote:

Hi,

I have a window function with a window width of 1 min. I want to have an 
hourly counter which is reset every hour so it never overflows. There 
are multiple ways but none of them is straightforward:


StatsDClient instance =new NonBlockingStatsDClientBuilder()

int count = 0;

void incr() {
metricClient.count("mycounter",1,"mytag");

   count++;

}

void reset() {
metricClient.count("mycounter",-count,"mytag");

count = 0;

}

As you can see, the code needs to maintain a "count" variable to reset 
mycounter.
Also since timer is not available in Window function, extra code is needed to 
reset mycounter every hour.
Is there an easier way for implementing hourly counter? Or it is not a concern 
that a counter will overflow?

Thanks

Lian







Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-22 Thread Dawid Wysakowicz
Hi Lian,

Thank you for sending the full code for the pojo. It clarified a lot!

I learnt that Avro introduced yet another mechanism for retrieving
conversions for logical types in Avro 1.9.x. I was not aware they create
a static SpecificData field with registered logical conversions if a
logical type is part of a union. That's why I did not understand the
parts of the you sent me where you are registering the logical types in
the MODEL$ field. The getConversion method is part of the
SpecificRecordBase class and is being populated by Avro compiler when a
logical type is a top level type. This bit should work just fine.

Unfortunately we do not support this "feature" of using the static
SpecificData field. So far we create the SpecificData manually in
AvroSerializer and Avro(De)SerializationSchema that is why the
conversions are not being picked up. I created a JIRA issue[1] and a
PR[2] to support it in Flink 1.12.

The only workaround I can see in earlier versions of Flink is to change
the AvroSerializer manually. You would need to do a similar thing as I
do in the linked PR.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-19339

[2] https://github.com/apache/flink/pull/13450

On 21/09/2020 19:28, Lian Jiang wrote:
> Thanks guys. Given Flink 1.12 is not ready (e.g. not available in
> Maven repo), I need to stick to 1.11.
>
> Dawid,
>
> For the code throwing "java.lang.Long cannot be cast to
> java.time.Instant",
>
> The avro schema has:
> union {null, timestamp_ms } eventTime = null;
>
> The avro pojo does have the logical type conversion:
>   private static SpecificData MODEL$ = new SpecificData();
> static {
> MODEL$.addLogicalTypeConversion(new 
> org.apache.avro.data.TimeConversions.TimestampMillisConversion());
>   }
>
> I don't see SpecificRecord#getConversions() you mentioned in avro repo.
> The pojo code throws:
> public void put(int field$, java.lang.Object value$) {
>   switch (field$) {
>   case 3: eventTime = (java.time.Instant)value$; break; // throw here
>   }
>
> I will send the full avdl and pojo offline to you for a close look.
>
>
> Regards
> Lian
>
>
> On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek  > wrote:
>
> Hi All,
>
> Avro was finally bumped in
> https://issues.apache.org/jira/browse/FLINK-18192.
>
> The implementers didn't see
> https://issues.apache.org/jira/browse/FLINK-12532, but it is also
> updated now.
>
> Best,
> Aljoscha
>
> On 21.09.20 08:04, Arvid Heise wrote:
> > Hi Lian,
> >
> > we had a similar discussion on [1].
> >
> > TL;DR you are using Avro 1.9.x while Flink still bundles Avro
> 1.8 [2] until
> > Hive bumps it [3]. In the thread, I gave some options to avoid
> running into
> > the issue.
> > The easiest fix is to use Avro 1.8.2 all the way, but you may
> run into [4]
> > if your logical type is nullable (which is not necessary in most
> cases).
> >
> > Still, I think it's time for us to revise the decision to wait
> for Hive to
> > bump and rather upgrade independently. Avro was for a long time
> stuck on
> > 1.8 but the project gained traction again in the past two years.
> On the
> > other hand, Hive seems to be rather slow to respond to that and we
> > shouldn't have a slow moving component block us to support a
> fast moving
> > component if it's such apparent that users want it.
> > @Aljoscha Krettek  > could you please pick that topic up
> > and ping the respective maintainers?
> >
> > [1]
> >
> 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> > [2] https://issues.apache.org/jira/browse/FLINK-12532
> > [3] https://issues.apache.org/jira/browse/HIVE-21737
> > [4] https://issues.apache.org/jira/browse/AVRO-1891
> >
> > On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang
> mailto:jiangok2...@gmail.com>> wrote:
> >
> >> Thanks Dawid for proposing
> ConfluentRegistryDeserializationSchema. I am
> >> trying ConfluentRegistryAvroDeserializationSchema (if this is
> what you
> >> mean) but got "java.lang.Long cannot be cast to
> java.time.Instant". This
> >> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
> >>  Is there
> any progress
> >> for this JIRA? Thanks. Regards!
> >>
> >>
> >> Stacktrace:
> >> java.lang.ClassCastException: java.lang.Long cannot be cast to
> >> java.time.Instant
> >> at
> com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
> >> at
> org.apache.avro.generic.GenericData.setField(GenericData.java:795)
> >> at
> >>
> 
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
> >> at
> >>
> 
> org.apache.avro.generic.Generi

[ANNOUNCE] Weekly Community Update 2020/38

2020-09-22 Thread Konstantin Knauf
Dear community,

happy to share a brief community update for the past week. A lot of FLIP
votes are currently ongoing on the dev@ mailing list. I've covered this
FLIP previously, so skipping those this time. Besides that, a couple of
release related updates and again multiple new Committers.

Flink Development
==

* [releases] Apache Flink 1.12.2 was released. [1]

* [releases] The first release candidate for Stateful Functions 2.2.0 was
published and already cancelled :) A new release candidate will probably be
published today. [2]

* [releases] Robert has shared another update on blocker and build
instabilities for the upcoming release of Apache Flink 1.12. There are five
weeks left till feature freeze. [3]

* [releases] Chesnay started a discussion thread on releases flink-shaded
12.0 containing upgrades to some of Apache Flink's core dependencies. [4]

* [cep, sql] Kosma has started a discussion on supporting timeouts in
MATCH_RECOGNIZE, which would allow a pattern to fire/match in the absence
of an event. [5]

[1] https://flink.apache.org/news/2020/09/17/release-1.11.2.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-2-2-0-release-candidate-1-tp45032.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-1-12-Stale-blockers-and-build-instabilities-tp43477.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-flink-shaded-12-0-td44968.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Timed-out-patterns-handling-using-MATCH-RECOGNIZE-tp45001.html

Notable Bugs
==

Nothing notable came to my attention.

Events, Blog Posts, Misc
===

* Godfrey He, Igal Shilman and Yun Tang are now Apache Flink Committers.
Congratulations! [6,7,8]

* The first keynote of Flink Forward Global has been announced: "Real-Time
Metrics at Fortnite Scale" by Ricky Saltzer of Epic Games []

[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Godfrey-He-tp44830.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Igal-Shilman-tp44754p44865.html
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Yun-Tang-tp44777p44909.html
[9] https://twitter.com/FlinkForward/status/1306219099475902464

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-22 Thread Timo Walther

Hi Husky,

I guess https://issues.apache.org/jira/browse/FLINK-14055 is what is 
needed to make this feature possible.


@Rui: Do you know more about this issue and current limitations.

Regards,
Timo


On 18.09.20 09:11, Husky Zeng wrote:

When we submit a job which use udf of hive , the job will dependent on udf's
jars and configuration files.

We have already store udf's jars and configuration files in hive metadata
store,so we excpet that flink could get those files hdfs paths by
hive-connector,and get those files in hdfs by paths when it running.

In this code, it seemed we have already get those udf resources's path in
FunctionInfo, but did't use it.

   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java#L80


We submit udf's  jars and configuration files with job to yarn by client now
,and try to find a way to avoid submit udf's resources when we submit a
job.Is it possible?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Flink DynamoDB stream connector losing records

2020-09-22 Thread Jiawei Wu
Hi Ying and Danny,

Sorry for the late reply, I just got back from vacation.

Yes I'm running Flink in Kinesis Data Analytics with Flink 1.8, and
checkpoint is enabled. This fully managed solution limits my access to
Flink logs, so far I didn't get any logs related to throttle or fail over.
The reason why I suspect throttle is the root cause is because some AWS
lambda that connects to the same DynamoDB stream has higher throttle right
after Flink starts consuming the DynamoDB stream, in this case I believe
the throttle will also happen on Flink side. I'm actively working with AWS
support to try to find some logs on this.

At the same time, when you say 'in theory should not lose exactly-once
semantics', does that mean Flink will retry when throttle? I notice there
is a parameter "flink.shard.getrecords.maxretries" and it's default value
is 3. Will Flink skip this record when all retry attempts failed?

Thanks,
Jiawei



On Tue, Sep 15, 2020 at 4:38 PM Cranmer, Danny  wrote:

> Hi Jiawei,
>
>
>
> I agree that the offset management mechanism uses the same code as Kinesis
> Stream Consumer and in theory should not lose exactly-once semantics. As
> Ying is alluding to, if your application is restarted and you have
> snapshotting disabled in AWS there is a chance that records can be lost
> between runs. However, if you have snapshotting enabled then the
> application should continue consuming records from the last processed
> sequence number.
>
>
>
> I am happy to take a deeper look if you can provide more
> information/logs/code.
>
>
>
> Thanks,
>
>
>
> *From: *Ying Xu 
> *Date: *Monday, 14 September 2020 at 19:48
> *To: *Andrey Zagrebin 
> *Cc: *Jiawei Wu , user 
> *Subject: *RE: [EXTERNAL] Flink DynamoDB stream connector losing records
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi Jiawei:
>
>
>
> Sorry for the delayed reply.  When you mention certain records getting
> skipped, is it from the same run or across different runs.  Any more
> specific details on how/when records are lost?
>
>
>
> FlinkDynamoDBStreamsConsumer is built on top of FlinkKinesisConsumer ,
> with similar offset management mechanism.  In theory it shouldn't lose
> exactly-once semantics in the case of getting throttled.  We haven't run it
> in any AWS kinesis analytics environment though.
>
>
>
> Thanks.
>
>
>
>
>
> On Thu, Sep 10, 2020 at 7:51 AM Andrey Zagrebin 
> wrote:
>
> Generally speaking this should not be a problem for exactly-once but I am
> not familiar with the DynamoDB and its Flink connector.
>
> Did you observe any failover in Flink logs?
>
>
>
> On Thu, Sep 10, 2020 at 4:34 PM Jiawei Wu 
> wrote:
>
> And I suspect I have throttled by DynamoDB stream, I contacted AWS support
> but got no response except for increasing WCU and RCU.
>
>
>
> Is it possible that Flink will lose exactly-once semantics when throttled?
>
>
>
> On Thu, Sep 10, 2020 at 10:31 PM Jiawei Wu 
> wrote:
>
> Hi Andrey,
>
>
>
> Thanks for your suggestion, but I'm using Kinesis analytics application
> which supports only Flink 1.8
>
>
>
> Regards,
>
> Jiawei
>
>
>
> On Thu, Sep 10, 2020 at 10:13 PM Andrey Zagrebin 
> wrote:
>
> Hi Jiawei,
>
>
>
> Could you try Flink latest release 1.11?
> 1.8 will probably not get bugfix releases.
>
> I will cc Ying Xu who might have a better idea about the DinamoDB source.
>
>
>
> Best,
>
> Andrey
>
>
>
> On Thu, Sep 10, 2020 at 3:10 PM Jiawei Wu 
> wrote:
>
> Hi,
>
>
>
> I'm using AWS kinesis analytics application with Flink 1.8. I am using
> the FlinkDynamoDBStreamsConsumer to consume DynamoDB stream records. But
> recently I found my internal state is wrong.
>
>
>
> After I printed some logs I found some DynamoDB stream record are skipped
> and not consumed by Flink. May I know if someone encountered the same issue
> before? Or is it a known issue in Flink 1.8?
>
>
>
> Thanks,
>
> Jiawei
>
>


Re: Does Flink support such a feature currently?

2020-09-22 Thread Marta Paes Moreira
Hi, Roc.

*Note:* in the future, please send this type of questions to the user
mailing list instead (user@flink.apache.org)!

If I understand your question correctly, this is possible using the LIKE
clause and a registered catalog. There is currently no implementation for
the MySQL JDBC catalog, but this is in the roadmap [1,2].

Once you register a catalog, you could do:

CREATE TABLE mapping_table

WITH (

  ...

 )

LIKE full_path_to_source_table;
Again, as of Flink 1.11 this only works for Postgres, not yet MySQL. I'm
copying in Bowen as he might be able to give more information on the
roadmap.

Marta

[1] https://issues.apache.org/jira/browse/FLINK-15352
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-93%3A+JDBC+catalog+and+Postgres+catalog

On Tue, Sep 22, 2020 at 9:44 AM Roc Marshal  wrote:

> Hi, everyone!
>
>When using flink sql DDL to create a mysql mapping table, does
> flink support the automatic rendering of the target table schema if we put
> no column-names in `create table table_name_mapping2mysql () with (...)`?
> If this feature is not supported, is it necessary to consider improving it?
>
> Thank you.
> Best, Roc.


Re: Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-22 Thread Husky Zeng
Hi Timo,

Thanks for your attention,As what I say in this comment, this feature can
surely solve our problem, but it seems that the workload is much larger than
the solution in my scenario. Our project urgently needs to solve the problem
of reusing hive UDF in hive metastore, so we are more inclined to develop a
fast solution. I want to hear some community advice.

https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927

Best Regards,
Husky Zeng



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-22 Thread Rui Li
Hi Timo,

I believe the blocker for this feature is that we don't support dynamically
adding user jars/resources at the moment. We're able to read the path to
the function jar from Hive metastore, but we cannot load the jar after the
user session is started.

On Tue, Sep 22, 2020 at 3:43 PM Timo Walther  wrote:

> Hi Husky,
>
> I guess https://issues.apache.org/jira/browse/FLINK-14055 is what is
> needed to make this feature possible.
>
> @Rui: Do you know more about this issue and current limitations.
>
> Regards,
> Timo
>
>
> On 18.09.20 09:11, Husky Zeng wrote:
> > When we submit a job which use udf of hive , the job will dependent on
> udf's
> > jars and configuration files.
> >
> > We have already store udf's jars and configuration files in hive metadata
> > store,so we excpet that flink could get those files hdfs paths by
> > hive-connector,and get those files in hdfs by paths when it running.
> >
> > In this code, it seemed we have already get those udf resources's path in
> > FunctionInfo, but did't use it.
> >
> >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java#L80
> >
> > We submit udf's  jars and configuration files with job to yarn by client
> now
> > ,and try to find a way to avoid submit udf's resources when we submit a
> > job.Is it possible?
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>

-- 
Cheers,
Rui Li


Re: Zookeeper connection loss causing checkpoint corruption

2020-09-22 Thread Arpith P
I created a ticket with all my findings.
https://issues.apache.org/jira/browse/FLINK-19359.

Thanks,
Arpith

On Tue, Sep 22, 2020 at 12:16 PM Timo Walther  wrote:

> Hi Arpith,
>
> is there a JIRA ticket for this issue already? If not, it would be great
> if you can report it. This sounds like a critical priority issue to me.
>
> Thanks,
> Timo
>
> On 22.09.20 06:25, Arpith P wrote:
> > Hi Peter,
> >
> > I have recently had a similar issue where I could not load from the
> > checkpoints path. I found that whenever a corrupt checkpoint happens the
> > "_metadata" file will not be persisted, and I've a  program which tracks
> > if checkpoint location based on this strategy and updates DB with
> > location based on timestamp. To restore the latest checkpoint I'm
> > querying DB ordered by latest timestamp. Let me know if this is helpful,
> > I can share code for this if needed.
> >
> > Arpith
> >
> > On Mon, Sep 21, 2020 at 6:37 PM Peter Westermann
> > mailto:no.westerm...@genesys.com>> wrote:
> >
> > I recently ran into an issue with our Flink cluster: A zookeeper
> > service deploy caused a temporary connection loss and triggered a
> > new jobmanager leader election. Leadership election was successful
> > and our Flink job restarted from the last checkpoint. 
> >
> > This checkpoint appears to have been taken while we los connection
> > to Zookeeper and ended up in a corrupted state so the Flink job kept
> > failing. Here’s the exception stack trace for that:
> >
> > 2020-09-18 01:10:57
> >
> > java.lang.Exception: Exception while creating
> > StreamOperatorStateContext.
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> >
> >   at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> >
> >   at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> >
> >   at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> >
> >   at java.lang.Thread.run(Thread.java:748)
> >
> > Caused by: org.apache.flink.util.FlinkException: Could not restore
> > keyed state backend for
> > KeyedCoProcessOperator_a2b0d706714fc3856e0e38da3c54b7de_(27/40) from
> > any of the 1 provided restore options.
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >
> >   ... 9 more
> >
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> > Caught unexpected exception.
> >
> >   at
> >
>  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:335)
> >
> >   at
> >
>  
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >
> >   at
> >
>  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >
> >   ... 11 more
> >
> > Caused by: java.io.IOException: Error while opening RocksDB
> > instance.
> >
> >   at
> >
>  
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:89)
> >
> >   at
> >
>  
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreO

Re: App gets stuck in Created State

2020-09-22 Thread Arpith P
All the job manager logs have been deleted from the cluster. I'll have to
work with the infra team to get it back, once I have it i'll post it here.

Arpith

On Mon, Sep 21, 2020 at 5:50 PM Zhu Zhu  wrote:

> Hi Arpith,
>
> All tasks in CREATED state indicates no task is scheduled yet. It is
> strange it a job gets stuck in this state.
> Is it possible that you share the job manager log so we can check what is
> happening there?
>
> Thanks,
> Zhu
>
> Arpith P  于2020年9月21日周一 下午3:52写道:
>
>> Hi,
>>
>> We have Flink 1.8.0 cluster deployed in Hadoop distributed mode, I often
>> see even though Hadoop has enough resources Flink sits in Created state.
>> We have 4 operators using 15 parallelism, 1 operator using 40 & 2 operators
>> using 10. At time of submission I'm passing taskmanager memory as 4Gb and
>> job manager memory as 2gb. and 2 slots This request should only take 20
>> containers and 40 Vcores. But I see Flink is overallocating resource of 65
>> containers and 129 Cores . I've attached snapshots for references.
>>
>> Right now I'm passing:  -yD
>> yarn.heartbeat.container-request-interval=1000 -yD
>> taskmanager.network.memory.fraction=0.045 -yD
>> taskmanager.memory.preallote=true.
>>
>> How do I control resource allocation?.
>>
>>


Re: How to stop multiple Flink jobs of the same name from being created?

2020-09-22 Thread Yang Wang
Hi Dan,

First, I want to get more information about your submission so that we
could make the question clear.

Are you using TableEnvironment to execute multiple "INSERT INTO" sentences
and find that each one will
be executed in a separated Flink cluster? It is really strange, and I want
to know how your are deploying your
Flink cluster on Kubernetes, via standalone[1] or native integration[2]. If
it is the former, I am afraid you need
`kubectl` to start/stop your Flink application manually. If it is the
latter, I think the Flink cluster will be destroyed
automatically when the Flink job failed. Also all the SQL jobs will be
executed in a shared Flink application.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
[2].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html


Best,
Yang

Dan Hill  于2020年9月21日周一 上午8:15写道:

> I've read the following upgrade application page
> .
> This seems to focus on doing this in a wrapper layer (e.g. Kubernetes).
> Just checking to see if this is the common practice or do people do this
> from their client jars.
>
>
>
> On Sun, Sep 20, 2020 at 5:13 PM Dan Hill  wrote:
>
>> I'm prototyping with Flink SQL.  I'm iterating on a client job with
>> multiple INSERT INTOs.  Whenever I have an error, my Kubernetes job
>> retries.  This creates multiple stream jobs with the same names.
>>
>> Is it up to clients to delete the existing jobs?  I see Flink CLI
>> functions for this.  Do most people usually do this from inside their
>> client jar or their wrapper code (e.g. Kubernetes job).
>>
>> - Dan
>>
>


Re: Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

2020-09-22 Thread Shubham Kumar
Hi Xintong,

Thanks for your insights, they are really helpful.

I understand now that it most certainly is a native memory issue rather
than a heap memory issue and about not trusting Flink's Non-Heap metrics.

I do believe that our structure of job is so simple that I couldn't find
any use of mmap memory or any other straight forward native memory leak
issue. That leads me to believing that it can be a rocksDB issue, although
you do make a valid point about that there is extra 2GB in the yarn
container which should account for RocksDB extra usage. I also saw this
JIRA ticket for RocksDB memory leak issue on K8 kubernetes and was
wondering if the same could happen on yarn containers and is related to my
issue [1]. Let me know what you guys think about this.

Also, I tried running the same job using FileSystemBackend (as a separate
job) and it went fine with no container kills and native memory not rising
over time, which hints further towards RocksDB being the culprit. My state
size in the checkpoint is around 1GB (can probably even think of switching
to FileSystemBackend for this job but still want to figure out the case for
RocksDB). I am using incremental checkpoints in my main job which has
RocksDB state backend, if that's relevant.

I read about native memory tracking and probably go ahead and use Native
Memory Tracking (NMT) or jemalloc to confirm about the RocksDB issue and
update here.

[1]: https://issues.apache.org/jira/browse/FLINK-18712

Thanks
Shubham

On Mon, Sep 21, 2020 at 8:23 AM Xintong Song  wrote:

> Hi Shubham,
>
> Java heap memory cannot cause a container memory exceeding. Heap memory is
> strictly limited by the JVM `-Xmx` parameter. If the program does need more
> memory than the limit, it will run into a heap space OOM, rather than
> implicitly using more memory than the limit.
>
> Several reasons that might lead to container memory exceeding.
> - RocksDB, whose memory controlling is based on estimation rather than
> hard limit. This is one of the most common reasons for such memory
> exceedings. However, usually the extra memory usage introduced by RocksDB,
> if there's any, should not be too large. Given that your container size is
> 12GB and Flink only plans to use 10GB, I'm not sure whether RocksDB is the
> cause in your case. I've CC'ed @Yun Tang, who is the expert of Flink's
> RocksDB state backend.
> - Does your job use mmap memory? MMap memory, if used, is controlled by
> the operating system, not Flink. Depending on your Yarn cgroup
> configurations, some clusters would also count that as part of the
> container memory consumption.
> - Native memory leaks in user code dependencies and libraries could also
> lead to container memory exceeding.
>
> Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
> practically helpless and misleading. The "Non-Heap" accounts for SOME of
> the non-heap memory usage, but NOT ALL of them. The community is working on
> a new set of metrics and Web UI for the task manager memory tuning.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar 
> wrote:
>
>> Hey everyone,
>>
>> We had deployed a streaming job using Flink 1.10.1 one month back and now
>> we are encountering a Yarn container killed due to memory issues very
>> frequently. I am trying to figure out the root cause of this issue in order
>> to fix it.
>>
>> We have a streaming job whose basic structure looks like this:
>> - Read 6 kafka streams and combine stats from them (union) to form a
>> single stream
>> - stream.keyBy(MyKey)
>>  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>  .reduce(MyReduceFunction)
>>  .addSink(new FlinkKafkaProducer011<>...);
>>
>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>> one slot per task manager.
>>
>> So, a taskmanager process gets started with the following memory
>> components as indicated in logs:
>>
>> TaskExecutor container... will be started on ... with
>>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>>
>>
>>>
>>
>>  which are as per defaults.
>>
>> Now, after 25 days we started encountering the following yarn container
>> kill error:
>>
>>> Association with remote system [akka.tcp://flink@...] has failed,
>>> address is now gated for [50] ms. Reason: [Association failed with
>>> [akka.tcp://flink@...]] Caused by: [java.net.ConnectException:
>>> Connection refused: .../...:37679]
>>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>>> container_e193_1592804717489_149347_01_11 beca

Adaptive load balancing

2020-09-22 Thread Navneeth Krishnan
Hi All,

We are currently using flink in production and use keyBy for performing a
CPU intensive computation. There is a cache lookup for a set of keys and
since keyBy cannot guarantee the data is sent to a single node we are
basically replicating the cache on all nodes. This is causing more memory
problems for us and we would like to explore some options to mitigate the
current limitations.

Is there a way to group a set of keys and send to a set of nodes so that we
don't have to replicate the cache data on all nodes?

Has someone tried implementing hashing with adaptive load balancing so that
if a node is busy processing then the data can be routed effectively to
other nodes which are free.

Any suggestions are greatly appreciated.

Thanks


RichFunctions in Flink's Table / SQL API

2020-09-22 Thread Piyush Narang
Hi folks,

We were looking to cache some data using Flink’s MapState in one of our UDFs 
that are called by Flink SQL queries. I was trying to see if there’s a way to 
set up these state objects via the basic FunctionContext [1] we’re provided in 
the Table / SQL UserDefinedFunction class [2] but from what I can see it’s not 
possible. We just seem to have access to retrieve the metric group and access 
to the distributed cache / job params. Is there a way for us in Table / SQL 
UDFs to access Flink’s state and store data? Or is this something that isn’t 
supported / recommended? (If it helps we’re on Flink 1.9 and using the old SQL 
planner).

Our broader use-case is to enrich some data coming in via a Kafka stream by 
reading additional data in DynamoDB. We’d like to cache this across restarts to 
cut down on some of the DynamoDb traffic. (Ideally we’d like to move to 
temporal tables, but I think that requires a migration to Blink first?)

Thanks,

[1] - 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html
[2] - 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html

-- Piyush



Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-22 Thread Claude M
Thanks for your responses.
1.  There were no job re-starts prior to the metaspace OEM.
2.  I tried increasing the CPU request and still encountered the problem.
Any configuration change I make to the job manager, whether it's in the
flink-conf.yaml or increasing the pod's CPU/memory request, results
with this problem.


On Tue, Sep 22, 2020 at 12:04 AM Xintong Song  wrote:

> Thanks for the input, Brain.
>
> This looks like what we are looking for. The issue is fixed in 1.10.3,
> which also matches this problem occurred in 1.10.2.
>
> Maybe Claude can further confirm it.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian  wrote:
>
>> Hi Xintong and Claude,
>>
>>
>>
>> In our internal tests, we also encounter these two issues and we spent
>> much time debugging them. There are two points I need to confirm if we
>> share the same problem.
>>
>>1. Your job is using default restart strategy, which is per-second
>>restart.
>>2. Your CPU resource on jobmanager might be small
>>
>>
>>
>> Here is some findings I want to share.
>>
>> ## Metaspace OOM
>>
>> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we have
>> some job restarts, there will be some threads from the sourceFunction
>> hanging, cause the class loader cannot close. New restarts would load new
>> classes, then expand the metaspace, and finally OOM happens.
>>
>>
>>
>> ## Leader retrieving
>>
>> Constant restarts may be heavy for jobmanager, if JM CPU resources are
>> not enough, the thread for leader retrieving may be stuck.
>>
>>
>>
>> Best Regards,
>>
>> Brian
>>
>>
>>
>> *From:* Xintong Song 
>> *Sent:* Tuesday, September 22, 2020 10:16
>> *To:* Claude M; user
>> *Subject:* Re: metaspace out-of-memory & error while retrieving the
>> leader gateway
>>
>>
>>
>> ## Metaspace OOM
>>
>> As the error message already suggested, the metaspace OOM you encountered
>> is likely caused by a class loading leak. I think you are on the right
>> direction trying to look into the heap dump and find out where the leak
>> comes from. IIUC, after removing the ZK folder, you are now able to run
>> Flink with the heap dump options.
>>
>>
>>
>> The problem does not occur in previous versions because Flink starts to
>> set the metaspace limit since the 1.10 release. The class loading leak
>> might have already been there, but is never discovered. This could lead to
>> unpredictable stability and performance issues. That's why Flink updated
>> its memory model and explicitly set the metaspace limit in the 1.10 release.
>>
>>
>>
>> ## Leader retrieving
>>
>> The command looks good to me. If this problem happens only once, it could
>> be irrelevant to adding the options. If that does not block you from
>> getting the heap dump, we can look into it later.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>>
>>
>> On Mon, Sep 21, 2020 at 9:37 PM Claude M  wrote:
>>
>> Hi Xintong,
>>
>>
>>
>> Thanks for your reply.  Here is the command output w/ the java.opts:
>>
>>
>>
>> /usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC
>> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log
>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>> -classpath
>> /opt/flink/lib/flink-metrics-datadog-statsd-2.11-0.1.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/flink/lib/flink-table-blink_2.11-1.10.2.jar:/opt/flink/lib/flink-table_2.11-1.10.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.10.2.jar::/etc/hadoop/conf:
>> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
>> --configDir /opt/flink/conf --executionMode cluster
>>
>>
>>
>> To answer your questions:
>>
>>- Correct, in order for the pod to start up, I have to remove the
>>flink app folder from zookeeper.  I only have to delete once after 
>> applying
>>the java.opts arguments.  It doesn't make sense though that I should have
>>to do this just from adding a parameter.
>>- I'm using the standalone deployment.
>>- I'm using job cluster mode.
>>
>> A higher priority issue I'm trying to solve is this metaspace out of
>> memory that is occuring in task managers.  This was not happening before I
>> upgraded to Flink 1.10.2.  Even after increasing the memory, I'm still
>> encountering the problem.  That is when I added the java.opts argument to
>> see if I can get more information about the problem.  That is when I ran
>> across the second issue w/ the job manager pod not starting up.
>>
>>
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> On Sun, Sep 20, 2020 at 10:23 PM Xintong Song 
>> wrote:
>>
>> Hi Claude,
>>
>>
>>
>> IIUC, in your case the leader retrieving problem is triggered by adding
>> the `java.opts`? Then could you try to find and post the complete command
>> for launching the JVM process? You can try log into the pod and execute `ps
>> -ef | grep `.

Re: Better way to share large data across task managers

2020-09-22 Thread Kostas Kloudas
Hi Dongwon,

If you know the data in advance, you can always use the Yarn options
in [1] (e.g. the "yarn.ship-directories") to ship the directories with
the data you want only once to each Yarn container (i.e. TM) and then
write a udf which reads them in the open() method. This will allow the
data to be shipped only once per TM but then each of the tasks will
have its own copy in memory of course. By default the visibility of
the files that you ship is set to APPLICATION [2], if I am not
mistaken so if more than one TMs go to the same node, then you will
have even less copies shipped.

Does this help with your usecase?

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
[2] 
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html

On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim  wrote:
>
> Hi,
>
> I'm using Flink broadcast state similar to what Fabian explained in [1]. One 
> difference might be the size of the broadcasted data; the size is around 
> 150MB.
>
> I've launched 32 TMs by setting
> - taskmanager.numberOfTaskSlots : 6
> - parallelism of the non-broadcast side : 192
>
> Here's some questions:
> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
> each TM can read the broadcasted data? I'm considering implementing a static 
> class for the non-broadcast side to directly load data only once on each 
> TaskManager instead of the broadcast state (FYI, I'm using per-job clusters 
> on YARN, so each TM is only for a single job). However, I'd like to use Flink 
> native facilities if possible.
>
> The type of broadcasted data is Map with around 600K entries, so 
> every time the data is broadcasted a lot of GC is inevitable on each TM due 
> to the (de)serialization cost.
>
> Any advice would be much appreciated.
>
> Best,
>
> Dongwon
>
> [1] https://flink.apache.org/2019/06/26/broadcast-state.html


Stateful Functions + ML model prediction

2020-09-22 Thread John Morrow
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The 
records are sourced from a message queue, enriched as they flow through the 
pipeline based on business rules and finally written to a database. We're using 
the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a 
certain word then set field Y to a certain value. For the implementation I 
began by looking at 
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for 
inspiration. I ended up implementing a business rule as a Java class with a 
match-predicate & an action. The records enter the pipeline on a data stream 
which is joined with the rules in a broadcast stream and a ProcessFunction 
checks each record to see if it matches any rule predicates. If the record 
doesn't match any business rule predicates it continues on in the pipeline. If 
the record does match one or more business rule predicates it is sent to a side 
output with the list of business rules that it matched. The side output data 
stream goes through a RichAsyncFunction which loops through the matched rules 
and applies each one's action to the record. At the end, that enriched 
side-output record stream is unioned back with the non-enriched record stream. 
This all worked fine.

I have some new business rules which are more complicated and require sending 
the record's text field to different pre-trained NLP models for prediction, 
e.g. if a model predicts the text language is X then update field Y to that 
value, if another model predicts the sentiment is positive then set some other 
field to another value. I'm planning on using seldon-core to serve these 
pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in 
Flink. I could add in a new ProcessFunction in my pipeline before my existing 
enrichment-rule-predicate ProcessFunction and have it send the text to each of 
the prediction models and add the results for each one to the record so it's 
available for the enrichment step. The downside of this is that in the future 
I'm anticipating having more and more models, and not necessarily wanting to 
send each record to every model for prediction. e.g. I might have a business 
rule which says if the author of the text is X then get the sentiment (via the 
sentiment model) and update field Z, so it would be a waste of time doing that 
for all records.

I had a look at stateful functions. There's an example in the statefun.io 
overview which shows having a stateful function for doing a fraud model 
prediction based on if an account has had X number of frauds detected in the 
last 30 days, so the key for the state is an account number. In my case, these 
model predictions don't really have any state - they just take input and return 
a prediction, they're more like a stateless lambda function. Also, I was 
wondering if I implemented these as stateful functions would I be able to make 
them available to other Flink jobs within the cluster, as opposed to having 
them as individual RichAsyncFunctions defined within a single Flink job and 
only available to that. The last thing which made stateful functions sound good 
was that at the moment all my business rules happen to be orthogonal, but I can 
imagine in the future where I might want one rule to be based on another one, 
and whereas regular dataflows have to be an acyclic graph stateful functions 
could support that.

So, in summary:

  * Does this sound like a good use case for stateful functions?
  * Are stateful functions available to all Flink jobs within a cluster?


Thanks,
John.




Re: How to stop multiple Flink jobs of the same name from being created?

2020-09-22 Thread Dan Hill
Hi Yang!

The multiple "INSERT INTO" jobs all go to the same Flink cluster.  I'm
using this Helm chart
 (which
looks like the standalone option).  I deploy the job using a simple k8
Job.  Sounds like I should do this myself.  Thanks!

Thanks!
- Dan



On Tue, Sep 22, 2020 at 5:37 AM Yang Wang  wrote:

> Hi Dan,
>
> First, I want to get more information about your submission so that we
> could make the question clear.
>
> Are you using TableEnvironment to execute multiple "INSERT INTO" sentences
> and find that each one will
> be executed in a separated Flink cluster? It is really strange, and I want
> to know how your are deploying your
> Flink cluster on Kubernetes, via standalone[1] or native integration[2].
> If it is the former, I am afraid you need
> `kubectl` to start/stop your Flink application manually. If it is the
> latter, I think the Flink cluster will be destroyed
> automatically when the Flink job failed. Also all the SQL jobs will be
> executed in a shared Flink application.
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>
>
> Best,
> Yang
>
> Dan Hill  于2020年9月21日周一 上午8:15写道:
>
>> I've read the following upgrade application page
>> .
>> This seems to focus on doing this in a wrapper layer (e.g. Kubernetes).
>> Just checking to see if this is the common practice or do people do this
>> from their client jars.
>>
>>
>>
>> On Sun, Sep 20, 2020 at 5:13 PM Dan Hill  wrote:
>>
>>> I'm prototyping with Flink SQL.  I'm iterating on a client job with
>>> multiple INSERT INTOs.  Whenever I have an error, my Kubernetes job
>>> retries.  This creates multiple stream jobs with the same names.
>>>
>>> Is it up to clients to delete the existing jobs?  I see Flink CLI
>>> functions for this.  Do most people usually do this from inside their
>>> client jar or their wrapper code (e.g. Kubernetes job).
>>>
>>> - Dan
>>>
>>


Re: Flink Table SQL and writing nested Avro files

2020-09-22 Thread Dan Hill
Nice!  I'll try that.  Thanks, Dawid!

On Mon, Sep 21, 2020 at 2:37 AM Dawid Wysakowicz 
wrote:

> Hi Dan,
>
> I think the best what I can suggest is this:
>
> SELECT
>
> ROW(left.field0, left.field1, left.field2, ...),
>
> ROW(right.field0, right.field1, right.field2, ...)
>
> FROM ...
>
> You will need to list all the fields manually, as SQL does not allow for
> asterisks in regular function calls.
>
> If you are willing to give the Table API a try you might workaround some
> of the manual work with the Column Function[1]
>
> Table join = t1.join(t2).where($("id1").isEqual($("id2")));
> join
> .select(
> row(withColumns(range(1, t1.getSchema().getFieldCount(,
> row(withColumns(range(
> t1.getSchema().getFieldCount() + 1,
> t1.getSchema().getFieldCount() +
> t2.getSchema().getFieldCount(
> )
> .executeInsert("flat_avro")
> .await();
>
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#column-functions
> On 18/09/2020 09:47, Dan Hill wrote:
>
> Hi!
>
> I want to join two tables and write the results to Avro where the left and
> right rows are nested in the avro output.  Is it possible to do this with
> the SQL interface?
>
> Thanks!
> - Dan
>
>  CREATE TABLE `flat_avro` (
>`left` ROW,
>`right` ROW) WITH (
>'connector' = 'filesystem',
>'path' = 's3p://blah/blah',
>'format' = 'avro');INSERT INTO `flat_avro` SELECT left.*, right.* FROM 
> `left` LEFT JOIN `right`ON `left`.`id` = `right`.`id`);
>
>


Flink Statefun Byte Ingress

2020-09-22 Thread Timothy Bess
Hi,

So most of the examples of "module.yaml" files I've seen focus on protobuf
ingress, but is there a way to just get bytes from Kafka? I want to
integrate this with the rest of my codebase which uses JSON, but don't want
to migrate to protobuf just yet. I'm not totally sure how it would work
since function arguments seem to be encoded as an Any type which is a
protobuf type string + some bytes, I guess the string would need to be some
made up constant value and I'd just grab the bytes? Honestly just using
bytes like is done with the state value might be a bit more flexible to
work with.

Thanks,

Tim


Re: Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

2020-09-22 Thread Xintong Song
Hi Shubham,

Concerning FLINK-18712, thanks for the pointer. I was not aware of this
issue before. Running on Kubernetes or Yarn should not affect this issue. I
cannot tell whether this issue is the cause of your problem. The simplest
way to confirm this is probably just try the solution to see if that fixes
your problem.

Given that it could take weeks to reproduce your problem, I would suggest
to keep track of the native memory usage with jemalloc and jeprof. This
should provide direct information about which component is using extra
memory.

Thank you~

Xintong Song



On Tue, Sep 22, 2020 at 10:42 PM Shubham Kumar 
wrote:

> Hi Xintong,
>
> Thanks for your insights, they are really helpful.
>
> I understand now that it most certainly is a native memory issue rather
> than a heap memory issue and about not trusting Flink's Non-Heap metrics.
>
> I do believe that our structure of job is so simple that I couldn't find
> any use of mmap memory or any other straight forward native memory leak
> issue. That leads me to believing that it can be a rocksDB issue, although
> you do make a valid point about that there is extra 2GB in the yarn
> container which should account for RocksDB extra usage. I also saw this
> JIRA ticket for RocksDB memory leak issue on K8 kubernetes and was
> wondering if the same could happen on yarn containers and is related to my
> issue [1]. Let me know what you guys think about this.
>
> Also, I tried running the same job using FileSystemBackend (as a separate
> job) and it went fine with no container kills and native memory not rising
> over time, which hints further towards RocksDB being the culprit. My state
> size in the checkpoint is around 1GB (can probably even think of switching
> to FileSystemBackend for this job but still want to figure out the case for
> RocksDB). I am using incremental checkpoints in my main job which has
> RocksDB state backend, if that's relevant.
>
> I read about native memory tracking and probably go ahead and use Native
> Memory Tracking (NMT) or jemalloc to confirm about the RocksDB issue and
> update here.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-18712
>
> Thanks
> Shubham
>
> On Mon, Sep 21, 2020 at 8:23 AM Xintong Song 
> wrote:
>
>> Hi Shubham,
>>
>> Java heap memory cannot cause a container memory exceeding. Heap memory
>> is strictly limited by the JVM `-Xmx` parameter. If the program does need
>> more memory than the limit, it will run into a heap space OOM, rather than
>> implicitly using more memory than the limit.
>>
>> Several reasons that might lead to container memory exceeding.
>> - RocksDB, whose memory controlling is based on estimation rather than
>> hard limit. This is one of the most common reasons for such memory
>> exceedings. However, usually the extra memory usage introduced by RocksDB,
>> if there's any, should not be too large. Given that your container size is
>> 12GB and Flink only plans to use 10GB, I'm not sure whether RocksDB is the
>> cause in your case. I've CC'ed @Yun Tang, who is the expert of Flink's
>> RocksDB state backend.
>> - Does your job use mmap memory? MMap memory, if used, is controlled by
>> the operating system, not Flink. Depending on your Yarn cgroup
>> configurations, some clusters would also count that as part of the
>> container memory consumption.
>> - Native memory leaks in user code dependencies and libraries could also
>> lead to container memory exceeding.
>>
>> Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
>> practically helpless and misleading. The "Non-Heap" accounts for SOME of
>> the non-heap memory usage, but NOT ALL of them. The community is working on
>> a new set of metrics and Web UI for the task manager memory tuning.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar <
>> shubhamkumar1...@gmail.com> wrote:
>>
>>> Hey everyone,
>>>
>>> We had deployed a streaming job using Flink 1.10.1 one month back and
>>> now we are encountering a Yarn container killed due to memory issues very
>>> frequently. I am trying to figure out the root cause of this issue in order
>>> to fix it.
>>>
>>> We have a streaming job whose basic structure looks like this:
>>> - Read 6 kafka streams and combine stats from them (union) to form a
>>> single stream
>>> - stream.keyBy(MyKey)
>>>  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>>  .reduce(MyReduceFunction)
>>>  .addSink(new FlinkKafkaProducer011<>...);
>>>
>>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>>> one slot per task manager.
>>>
>>> So, a taskmanager process gets started with the following memory
>>> components as indicated in logs:
>>>
>>> TaskExecutor container... will be started on ... with
 TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
 134217728 bytes), frameworkOffHeapSize=128.000mb (

Back pressure with multiple joins

2020-09-22 Thread Dan Hill
Hi!

My goal is to better understand how my code impacts streaming throughput.

I have a streaming job where I join multiple tables (A, B, C, D) using
interval joins.

Case 1) If I have 3 joins in the same query, I don't hit back pressure.

SELECT ...
FROM A
LEFT JOIN B
ON...
LEFT JOIN C
ON...
LEFT JOIN D
ON...


Case 2) If I create temporary views for two of the joins (for reuse with
another query), I hit back a lot of back pressure.  This is selecting
slightly more fields than the first.

CREATE TEMPORARY VIEW `AB`

SELECT ...
FROM A
LEFT JOIN B
...

CREATE TEMPORARY VIEW `ABC`
SELECT ...
FROM AB
LEFT JOIN C
...



Can Temporary Views increase back pressure?

If A, B, C and D are roughly the same size (fake data), does the join order
matter?  E.g. I assume reducing the size of the columns in each join stage
would help.

Thanks!
- Dan


Re: Back pressure with multiple joins

2020-09-22 Thread Dan Hill
When I use DataStream and implement the join myself, I can get 50x the
throughput.  I assume I'm doing something wrong with Flink's Table API and
SQL interface.

On Tue, Sep 22, 2020 at 11:21 PM Dan Hill  wrote:

> Hi!
>
> My goal is to better understand how my code impacts streaming throughput.
>
> I have a streaming job where I join multiple tables (A, B, C, D) using
> interval joins.
>
> Case 1) If I have 3 joins in the same query, I don't hit back pressure.
>
> SELECT ...
> FROM A
> LEFT JOIN B
> ON...
> LEFT JOIN C
> ON...
> LEFT JOIN D
> ON...
>
>
> Case 2) If I create temporary views for two of the joins (for reuse with
> another query), I hit back a lot of back pressure.  This is selecting
> slightly more fields than the first.
>
> CREATE TEMPORARY VIEW `AB`
>
> SELECT ...
> FROM A
> LEFT JOIN B
> ...
>
> CREATE TEMPORARY VIEW `ABC`
> SELECT ...
> FROM AB
> LEFT JOIN C
> ...
>
>
>
> Can Temporary Views increase back pressure?
>
> If A, B, C and D are roughly the same size (fake data), does the join
> order matter?  E.g. I assume reducing the size of the columns in each join
> stage would help.
>
> Thanks!
> - Dan
>
>
>