Flink Kafka connector consume from a single kafka partition

2020-02-18 Thread hemant singh
Hello Flink Users, I have a use case where I am processing metrics from different type of sources(one source will have multiple devices) and for aggregations as well as build alerts order of messages is important. To maintain customer data segregation I plan to have single topic for each customer

Re: Parallelize Kafka Deserialization of a single partition?

2020-02-18 Thread Till Rohrmann
Then my statement must be wrong. Let me double check this. Yesterday when checking the usage of the objectReuse field, I could only see it in the batch operators. I'll get back to you. Cheers, Till On Wed, Feb 19, 2020, 07:05 Jin Yi wrote: > Hi Till, > I just read your comment: > Currently,

Re: Parallelize Kafka Deserialization of a single partition?

2020-02-18 Thread Jin Yi
Hi Till, I just read your comment: Currently, enabling object reuse via ExecutionConfig.enableObjectReuse() only affects the DataSet API. DataStream programs will always do defensive copies. There is a FLIP to improve this behaviour [1]. I have an application that is written in apache beam, but

Re: Persisting inactive state outside Flink

2020-02-18 Thread Akshay Aggarwal
Thanks Till. Going with your suggestion, I'll run some benchmarks to figure out how the lookups behave with increasing number of keys, and checkpoints with increasing state size. I'll take a decision based on the results, and maybe reach out to you if I need more information. Thanks a lot,

Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
There might be a possible workaround for this, for now: Basically, the trick is to explicitly tell the State Processor API to use a specified type information to access the keyed state. You can do that with the `ExistingSavepoint#readKeyedState(String uid, KeyedStateReaderFunction function,

Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
Hi, Just to clarify - I quickly went through the README of the project, and saw this: "This error is seen after trying to read from a savepoint that was created using the same case class as a key." So, if I understood correctly, you were attempting to use the State Processor API to access a

[Quesetion] how to havee additional Logging in Apache Beam KafkaWriter

2020-02-18 Thread Jin Yi
Hi there, I am using Apache Beam (v2.16) in my application, and the Runner is Flink(1.8). I use KafkaIO connector to consume from source topics and publish to sink topics. Here is the class that Apache Beam provides for publishing messages.

Re: CSV StreamingFileSink

2020-02-18 Thread Austin Cawley-Edwards
Following up on this -- does anyone know if it's possible to stream individual files to a directory using the StreamingFileSink? For instance, if I want all records that come in during a certain day to be partitioned into daily directories: 2020-02-18/ large-file-1.txt large-file-2.txt

State Processor API Keyed State

2020-02-18 Thread Mark Niehe
Hey all, I've run into an issue with the State Processor API. To highlight the issues I've been having, I've created a reference repository that will demonstrate the issue (repository: https://github.com/segmentio/flink-state-management). The current implementation of the pipeline has left us

CSV StreamingFileSink

2020-02-18 Thread Austin Cawley-Edwards
Hey all, Has anyone had success using the StreamingFileSink[1] to write CSV files? And if so, what about compressed (Gzipped, ideally) files/ which libraries did you use? Best, Austin [1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

Link to Flink on K8S Webinar

2020-02-18 Thread Aizhamal Nurmamat kyzy
Hi folks, Recently Aniket Mokashi and Dagang Wei hosted a webinar on how to use the flink k8s operator they have developed. The operator also supports working with Beam. If you think that this may be helpful to you, you may access the recording and slides via this link:

Updating ValueState not working in hosted Kinesis

2020-02-18 Thread Chris Stevens
Hi there, I'm trying to update state in one of my applications hosted in Kinesis Data Analytics. private transient ValueState sensorState; using sensorState.update(sensor); Get error: An error occurred: org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB at

Re: Persisting inactive state outside Flink

2020-02-18 Thread Till Rohrmann
Hmm, with this size you will need an aggregated disk capacity of 11 TB (for the 1.2 Bn devices). If most of the entries are permanently dormant, then this is not ideal. On the other hand, they would occupy the same space on your Hbase cluster. Concerning your questions about RocksDB: 1. When

Re: Can Connected Components run on a streaming dataset using iterate delta?

2020-02-18 Thread Yun Gao
Hi Kant, As far as I know, I think the current example connected components implementation based on DataSet API could not be extended to streaming data or incremental batch directly. From the algorithm's perspective, if the graph only add edge and never

Re: Persisting inactive state outside Flink

2020-02-18 Thread Akshay Aggarwal
Thanks Till, I really appreciate your response. We are in fact considering RocksDB as our state backend. The scale we are looking at is 1.2 Bn new devices every year, with a growth of ~30% YoY, the state per device is not expected to grow beyond few 10s of KBs though. The peak ingestion rates are

Identifying Flink Operators of the Latency Metric

2020-02-18 Thread Morgan Geldenhuys
Hi All, I have setup monitoring for Flink (1.9.2) via Prometheus and am interested in viewing the end-to-end latency at the sink operators for the 95 percentile. I have enabled latency markers at the operator level and can see the results, one of the entries looks as follows:

Re: Not able to consume kafka massages in flink-1.10.0 version

2020-02-18 Thread Stephan Ewen
This looks like a Kafka version mismatch. Please check that you have the right Flink connector and not any other Kafka dependencies from in the classpath. On Tue, Feb 18, 2020 at 10:46 AM Avinash Tripathy < avinash.tripa...@stellapps.com> wrote: > Hi, > > I am getting this error message. > >

Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

2020-02-18 Thread Robert Metzger
Hey Milind, can you additionally also set metrics.internal.query-service.port to the range? Best, Robert On Fri, Feb 7, 2020 at 8:35 PM Milind Vaidya wrote: > I tried setting that option but did not work. > > 2020-02-07 19:28:45,999 INFO >

Re: Using retained checkpoints as savepoints

2020-02-18 Thread Stephan Ewen
Maybe one small addition: - for the heap state backend, there is no difference at all between the format and behavior of retained checkpoints (after the job is canceled) and savepoints. Same format and features. - For RocksDB incremental checkpoints, we do in fact support re-scaling, and I

Re: job history server

2020-02-18 Thread Richard Moorhead
2020-02-18 09:44:45,227 ERROR org.apache.flink.runtime.webmonitor.hist/ry.HistoryServerArchiveFetcher - Failure while fetching/process ing job archive for job eaf0639027aca1624adaa100bdf1332e. java.nio.file.FileSystemException:

Re: [ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-18 Thread Till Rohrmann
Thanks for updating the 1.9.2 release wrt Flink's Python API Jincheng! Cheers, Till On Thu, Feb 13, 2020 at 12:25 PM Hequn Cheng wrote: > Thanks a lot for the release, Jincheng! > Also thanks to everyone that make this release possible! > > Best, > Hequn > > On Thu, Feb 13, 2020 at 2:18 PM

Re: [ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-18 Thread Till Rohrmann
Thanks for updating the 1.9.2 release wrt Flink's Python API Jincheng! Cheers, Till On Thu, Feb 13, 2020 at 12:25 PM Hequn Cheng wrote: > Thanks a lot for the release, Jincheng! > Also thanks to everyone that make this release possible! > > Best, > Hequn > > On Thu, Feb 13, 2020 at 2:18 PM

Re: Process stream multiple time with different KeyBy

2020-02-18 Thread Till Rohrmann
Hi Sébastien, there is always the possibility to reuse a stream. Given a DataStream input, you can do the following: KeyedStream a = input.keyBy(x -> f(x)); KeyedStream b = input.keyBy(x -> g(x)); This gives you two differently partitioned streams a and b. If you want to evaluate every event

Re: Parallelize Kafka Deserialization of a single partition?

2020-02-18 Thread Till Rohrmann
Hi Theo, the KafkaDeserializationSchema does not allow to return asynchronous results. Hence, Flink will always wait until KafkaDeserializationSchema.deserialize returns the parsed value. Consequently, the only way I can think of to offload the complex parsing logic would be to do it in a

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-18 Thread David Magalhães
Thanks for the feedback Arvid. Currently isn't an issue, but I will look back into it in the future. On Tue, Feb 18, 2020 at 1:51 PM Arvid Heise wrote: > Hi David, > > sorry for replying late. I was caught up on other incidents. > > I double-checked all the information that you provided and

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-18 Thread Arvid Heise
Hi David, sorry for replying late. I was caught up on other incidents. I double-checked all the information that you provided and conclude that you completely bypass our filesystems and plugins. What you are using is AvroParquetWriter, which brings in the hadoop dependencies, including raw

Re: Emit message at start and end of event time session window

2020-02-18 Thread Till Rohrmann
Hi Manas, you can implement something like this with a bit of trigger magic. What you need to do is to define your own trigger implementation which keeps state to remember whether it has triggered the "started window" message or not. In the stateful window function you would need to do something

Side Outputs from RichAsyncFunction

2020-02-18 Thread KristoffSC
Hi all, Is there a way to emit a side output from RichAsyncFunction operator like it is possible with ProcessFunctions via ctx.output(outputTag, value); At first glance I don't see a way to do it In my use case RichAsyncFunction is used to call REST services and I would like to handle REST error

Re: about registering completion function for worker shutdown

2020-02-18 Thread Dominique De Vito
Hi Robert, Thanks for your hint / reply / help. So far I have not tested your way (may be next), but tried another one: * use mapPartitions -- at the beginning, get a KafkaProducer -- the KafkaProducerFactory class I use is lazy and caches the first instances created; so, there is reuse. *

Re: Using retained checkpoints as savepoints

2020-02-18 Thread Aljoscha Krettek
Hi, the reason why we are quite conservative when it comes to stating properties of checkpoints is that we don't want to prevent ourselves from implementing possibly optimized checkpoint formats that would not support these features. You're right that currently checkpoints support most of

Re: Persisting inactive state outside Flink

2020-02-18 Thread Till Rohrmann
Hi Akshay, there is no easy out-of-the-box implementation for what you are asking. Before drafting a potential solution I wanted to ask whether using the RocksDB state backend could already solve your problem. With this state backend Flink is able to spill state data to disk. Would this work for

Not able to consume kafka massages in flink-1.10.0 version

2020-02-18 Thread Avinash Tripathy
Hi, I am getting this error message. [image: flink-kafka-consumer-error.png] Flink version: 1.10.0 Kafka version: kafka_2.12-2.1.0 Thanks, Avinash

How Do i Serialize a class using default kryo and protobuf in scala

2020-02-18 Thread ApoorvK
I have some case class which have primitive as well as nested class objects hence if I add any more variable in class savepoint does not restore I read if I can add kyroserializer on those class using google protobuf I will be able to serialize it from state. Can anyone please share any example

Re: Flink's Either type information

2020-02-18 Thread Yun Gao
Hi Jacopo, Could you also provide how the KeyedBroadcastProcessFunction is created when constructing datastream API ? For example, are you using something like new KeyedBroadcastProcessFunction() { // Function implementation }

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

2020-02-18 Thread Niels Basjes
Hi Gordon, Thanks. This works for me. I find it strange that when I do this it works (I made the differences bold) List result = new ArrayList<>(5); DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); *resultDataStream.print();* environment.execute(); how ever this