Flink vs Kafka streams

2019-11-07 Thread Navneeth Krishnan
Hello All,

I have a streaming job running in production which is processing over 2
billion events per day and it does some heavy processing on each event. We
have been facing some challenges in managing flink in production like
scaling in and out, restarting the job with savepoint etc. Flink provides a
lot of features which seemed as an obvious choice at that time but now with
all the operational overhead we are thinking should we still use flink for
our stream processing requirements or choose kafka streams.

We currently deploy flink on ECR. Bringing up a new cluster for another
stream job is too expensive but on the flip side running it on the same
cluster becomes difficult since there are no ways to say this job has to be
run on a dedicated server versus this can run on a shared instance. Also
savepoint point, cancel and submit a new job results in some downtime. The
most critical part being there is no shared state among all tasks sort of a
global state. We sort of achieve this today using an external redis cache
but that incurs cost as well.

If we are moving to kafka streams, it makes our deployment life much
easier, each new stream job will be a microservice that can scale
independently. With global state it's much easier to share state without
using external cache. But the disadvantage is we have to rely on the
partitions for parallelism. Although this might initially sound easier,
when we need to scale much higher this will become a bottleneck.

Do you guys have any suggestions on this? We need to decide which way to
move forward and any suggestions would be of much greater help.

Thanks


Re: Monitor rocksDB memory usage

2019-11-07 Thread Yun Tang
Hi Lu

I think RocksDB native metrics [1] could help.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics

Best
Yun Tang

From: Lu Niu 
Date: Friday, November 8, 2019 at 8:18 AM
To: user 
Subject: Monitor rocksDB memory usage

Hi,

I read that rocksDB memory is managed off heap. Is there a way to monitor the 
memory usage there then?

Best
Lu


Re: Retrieving Flink job ID/YARN Id programmatically

2019-11-07 Thread vino yang
Hi Lei Nie,

You can use
`StreamExecutionEnvironment#getStreamGraph#getJobGraph#getJobID` to get the
job id.

Best,
Vino

Lei Nie  于2019年11月8日周五 上午8:38写道:

> Hello,
> I am currently executing streaming jobs via StreamExecutionEnvironment. Is
> it possible to retrieve the Flink job ID/YARN ID within the context of a
> job? I'd like to be able to automatically register the job such that
> monitoring jobs can run (REST api requires for example job id).
>
> Thanks
>


Re: flink's hard dependency on zookeeper for HA

2019-11-07 Thread vino yang
Hi Vishwas,

In the standalone cluster HA mode, Flink heavily depends on ZooKeeper. Not
only for leader election, but also for:


   - Checkpoint metadata info;
   - JobGraph store;
   - 

So you should make sure your ZooKeeper Cluster works normally. More details
please see[1][2].

Best,
Vino

[1]:
https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html

Vishwas Siravara  于2019年11月7日周四 上午12:07写道:

> Hi all,
> I am using flink 1.7.2 as a standalone cluster in high availability mode
> with zookeeper. I have noticed that all flink processes go down once
> zookeeper goes down ? Is this expected behavior since the leader election
> has already happened and the job has been running for several hours.
>
>
> Best,
> Vishwas
>


Retrieving Flink job ID/YARN Id programmatically

2019-11-07 Thread Lei Nie
Hello,
I am currently executing streaming jobs via StreamExecutionEnvironment. Is
it possible to retrieve the Flink job ID/YARN ID within the context of a
job? I'd like to be able to automatically register the job such that
monitoring jobs can run (REST api requires for example job id).

Thanks


Monitor rocksDB memory usage

2019-11-07 Thread Lu Niu
Hi,

I read that rocksDB memory is managed off heap. Is there a way to monitor
the memory usage there then?

Best
Lu


Unable to retrieve Kafka consumer group offsets

2019-11-07 Thread Harrison Xu
I am using Flink 1.9.0 and KafkaConsumer010 (Kafka 0.10.1.1). Attempting to
retrieve the offset lag of Flink kafka consumers results in the below
error. I saw a separate thread about this in the mailing list in 2017 - is
this not fixed? Are there workarounds?


> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
> --bootstrap-server localhost:9092 --group test --describe
> Error while executing consumer group command Group test with protocol type
> '' is not a valid consumer group


Re: How can I get the backpressure signals inside my function or operator?

2019-11-07 Thread Yuval Itzchakov
Hi,

We've been dealing with a similar problem of downstream consumers causing
backpressure. One idea that a colleague of mine suggested is measuring the
time it takes to call Collector[T].out. Since this method is used to push
the records downstream, it will also actively block in case the buffer is
full and there are no more floating buffers to allocate, hence causing the
backpressure.

Thus, if you know the average time it takes this function to be invoked
when there's no backpressure, you can make an educated guess on the time it
takes when there is pressure (you'll need to measure these times in your
source/operator), and actively slow down the number of records being pushed
downstream.

Yuval.

On Thu, 7 Nov 2019, 9:17 Felipe Gutierrez, 
wrote:

> cool! I got to use it.
> Now I have to get the jobID and vertice ID inside the operator.
>
> I forgot to mention. I am using Flink 1.9.1
>
> Thanks!
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Thu, Nov 7, 2019 at 4:59 AM Zhijiang 
> wrote:
>
>> You can refer to this document [1] for the rest API details.
>> Actually the backpreesure uri refers to "
>> /jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether
>> it is easy to get the jobid and vertexid.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Felipe Gutierrez 
>> Send Time:2019 Nov. 7 (Thu.) 00:06
>> To:Chesnay Schepler 
>> Cc:Zhijiang ; user 
>> Subject:Re: How can I get the backpressure signals inside my function or
>> operator?
>>
>> If I can trigger the sample via rest API it is good for a POC. Then I can
>> read from any in-memory storage using a separated thread within the
>> operator. But what is the rest api that gives to me the ratio value from
>> backpressure?
>>
>> Thanks
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler 
>> wrote:
>>
>> I don't think there is a truly sane way to do this.
>>
>> I could envision a separate application triggering samples via the REST
>> API, writing the results into kafka which your operator can read. This is
>> probably the most reasonable solution I can come up with.
>>
>> Any attempt at accessing the TaskExecutor or metrics from within the
>> operator are inadvisable; you'd be encroaching into truly hacky territory.
>>
>> You could also do your own backpressure sampling within your operator
>> (separate thread within the operator executing the same sampling logic),
>> but I don't know how easy it would be to re-use Flink code.
>>
>> On 06/11/2019 13:40, Felipe Gutierrez wrote:
>> Does anyone know in which metric I can rely on to know if a given
>> operator is activating the backpressure?
>> Or how can I call the same java object that the Flink UI calls to give me
>> the ratio of backpressure?
>>
>> Thanks,
>> Felipe
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>> Hi Zhijiang,
>>
>> thanks for your reply. Yes, you understood correctly.
>> The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength"
>> on the operator might be because of the way Flink runtime architecture was
>> designed. But I was wondering what kind of signal I can get. I guess some
>> backpressure message I could get because backpressure works to slow down
>> the upstream operators.
>>
>> For example, I can see the ratio per sub-task on the web interface [1].
>> It means the physical operators. Is there any message flowing backward that
>> I can get? Is there anything that makes me able to not rely on some
>> external storage?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez *
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Tue, Nov 5, 2019 at 12:23 PM Zhijiang 
>> wrote:
>> Hi Felipe,
>>
>> That is an interesting idea to control the upstream's output based on
>> downstream's input.
>>
>> If I understood correctly, the preAggregate operator would trigger flush
>> output while the reduce operator is idle/hungry. In contrast, the 
>> preAggregate
>> would continue aggregating data in the case of back pressure.
>>
>> I think this requirement is valid, but unfortunately I guess you can not
>> get the back pressure signal from the operator level. AIK only the upper
>> task level can get the input/output state to decide whether to

Re: Till Rohrmann - Can you please share your code for FF - SF - Flink as a lib

2019-11-07 Thread Till Rohrmann
Quick update, the link should be working now.

Cheers,
Till

On Thu, Nov 7, 2019 at 1:30 PM Till Rohrmann  wrote:

> Hi,
>
> the code I've based my presentation on can be found here [1]. Be aware,
> though, that for demo purposes I used a patched Flink version which scaled
> down the running job. This is currently not yet properly supported by Flink.
>
> [1] https://github.com/GJL/ffber2018-flink-as-a-library
>
> Cheers,
> Till
>
> On Thu, Nov 7, 2019 at 6:50 AM arpit8622  wrote:
>
>> https://www.youtube.com/watch?v=WeHuTRwicSw
>> 
>>
>> Basically i wanted to check the job/task manager k8s yaml you used for
>> above
>> demo.
>>
>> It will be very helpful for the community.
>>
>> If its already commited somewhere can you please direct me to the link.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Flink Filters have state?

2019-11-07 Thread Timothy Victor
I have a FilterFunction implementation which accepts an argument in its
constructor which it stores as an instance member.For example:

class ThresholdFilter implements FilterFunction  {

  private final MyThreshold threshold;

  private int numElementsSeen;

  public ThresholdFilter(MyThreshold threshold) {
this.threshold = threshold;
  }

  

}

The filter uses the threshold in deciding whether or not to filter the
incoming element.

All this works but I have some gaps in my understanding.

1.   How is this filter stored and recovered in the case of a failure.   Is
it just serialized to a POJO and stored in the configured state backend?

2.  When recovered will it maintain the state of all members (e.g. note
that I have a numElementsSeen member in the filter which will keep
incrementi for each element recevied).

3.  Is this sort of thing even advisable for a filter?  I'm guessing
Filters are meant to be reusable across operator instances.  In which case
the state could be wrong after recovery?

Thanks in advance

Tim


Re: Till Rohrmann - Can you please share your code for FF - SF - Flink as a lib

2019-11-07 Thread Till Rohrmann
Hi,

the code I've based my presentation on can be found here [1]. Be aware,
though, that for demo purposes I used a patched Flink version which scaled
down the running job. This is currently not yet properly supported by Flink.

[1] https://github.com/GJL/ffber2018-flink-as-a-library

Cheers,
Till

On Thu, Nov 7, 2019 at 6:50 AM arpit8622  wrote:

> https://www.youtube.com/watch?v=WeHuTRwicSw
> 
>
> Basically i wanted to check the job/task manager k8s yaml you used for
> above
> demo.
>
> It will be very helpful for the community.
>
> If its already commited somewhere can you please direct me to the link.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How long is the flink sql task state default ttl?

2019-11-07 Thread Dian Fu
It's disabled by default. 

BTW: You only need to send it to user ML and it's not necessary to send it to 
the dev ML.

> 在 2019年11月7日,下午3:36,LakeShen  写道:
> 
> Hi community, as I know I can use idle state retention time to clear the 
> flink sql task state,I have a question is that how long the flink sql task 
> state default ttl is . Thanks



Re: RocksDB and local file system

2019-11-07 Thread Congxian Qiu
Hi

The path will store the checkpoints, and Flink will fetch the checkpoint
files to restore the state if any failure occurred.  If you specify the
local file system, when restoring from checkpoint, Flink may can't find the
checkpoint files, and can't restore from last checkpoints.

Best,
Congxian


vino yang  于2019年11月7日周四 上午10:57写道:

> Hi Jaqie,
>
> For testing, you can use the local file system pattern (e.g. "file:///").
> Technically speaking, it's OK to specify the string path provided by you.
>
> However, in the production environment, we do not recommend using the
> local file system. Because it does not provide high availability.
>
> Best,
> Vino
>
> Jaqie Chan  于2019年11月6日周三 下午11:29写道:
>
>> Hello
>>
>> I am using Flink rocksDb state backend, the documentation seems to imply
>> i can use a regular file system such as: file:///data/flink/checkpoints,
>> but the code javadoc only mentions hdfs or s3 option here.
>>
>> I am wondering if it's possible to use local file system with flink
>> rocksdb backend.
>>
>>
>> Thanks
>>
>> 嘉琪
>>
>


Re: RocksDB state on HDFS seems not being cleanned up

2019-11-07 Thread Yun Tang
Yes, just sum all file size within checkpoint meta to get the full checkpoint 
size (this would omit some byte stream state handles, but nearly accurate).

BTW, I think user-mail list is the better place for this email-thread, already 
sent this mail to user-mail list.

Best
Yun Tang

From: shuwen zhou 
Date: Thursday, November 7, 2019 at 12:02 PM
To: Yun Tang 
Cc: dev , Till Rohrmann 
Subject: Re: RocksDB state on HDFS seems not being cleanned up

Hi Yun,
Thank you for your detailed explanation,It brings me a lot to research. I think
1. I should try reduce number of "state.checkpoints.num-retained", maybe to 3, 
which could decrease amount of shared folder.
2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer is 
yes, maybe. I could have use the state process API you mentioned to figure it 
out and get back to you.
3. I have a look in file 
/flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata, there are a lot file 
names like 
hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03,
 sum those file's size up is the total size of each chekpoint, am I correct?
4. My checkpoint interval is 16 minutes.







On Wed, 6 Nov 2019 at 15:57, Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Shuwen

Since you just have 10 “chk-“ folders as expected and when subsuming 
checkpoints, the “chk-” folder would be removed after we successfully removed 
shared state [1]. That is to say, I think you might not have too many orphan 
states files left. To ensure this, you could use state process API [2] to load 
your checkpoints and compare all the files under “shared” folder to see whether 
there existed too many orphan files. If this is true, we might think of the 
custom compaction filter future of FRocksDB.

Secondly, your judgment of “20GB each checkpoint” might not be accurate when 
RocksDB incremental checkpoint is enabled, the UI showed is only the 
incremental size [3], I suggest you to count your files’s size within your 
checkpoint meta to know the accurate checkpoint size for each checkpoint.

Last but not least, RocksDB’s future of compaction filter to delete expired 
data only happened during compaction [4], I’m afraid you might need to look up 
your rocksDB’s LOG file to see the frequency of compaction on task managers. 
And I think the increasing size might be related with the interval of your 
checkpoints, what the interval when you executing checkpoints?


[1] 
https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
[3] 
https://issues.apache.org/jira/browse/FLINK-13390
[4] 
https://github.com/facebook/rocksdb/blob/834feaff05a4bf7ae49c736305d5eb180aed4011/include/rocksdb/compaction_filter.h#L61

Best
Yun Tang

From: shuwen zhou mailto:jaco...@gmail.com>>
Date: Wednesday, November 6, 2019 at 12:02 PM
To: dev mailto:d...@flink.apache.org>>, Yun Tang 
mailto:myas...@live.com>>, Till Rohrmann 
mailto:trohrm...@apache.org>>
Subject: Re: RocksDB state on HDFS seems not being cleanned up

Hi Yun and Till,
Thank you for your response.
For @Yun
1. No, I just renamed the checkpoint directory name since the directory name 
contains company data. Sorry for the confusion.
2. Yes, I set

state.checkpoints.num-retained: 10
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED

In flink.conf

I was expecting,