RichAggregationFunction

2020-06-23 Thread Steven Nelson
I am trying to add some custom metrics to my window (because the window is causing a lot of backpressure). However I can't seem to use a RichAggregationFunction instead of an AggregationFunction. I am trying to see how long things get held in our EventTimeSessionWindows.withGap window. Is there

Flink 1.10 memory and backpressure

2020-06-10 Thread Steven Nelson
We are working with a process and having some problems with backpressure. The backpressure seems to be caused by a simple Window operation, which causes our checkpoints to fail. What would be the recommendations for debugging the backpressure?

Memory issue in Flink 1.10

2020-05-27 Thread Steven Nelson
We recently migrated to Flink 1.10, but are experiencing some issues with memory. Our cluster is: 1) Running inside of Kubernetes 2) Running in HA mode 3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes 4) Using RocksDB for checkpointing 5) Running on m5d.4xlarge EC2 instances

Re: Flink cluster on k8s with rocksdb state backend

2019-10-17 Thread Steven Nelson
You may want to look at using instances with local ssd drives. You don’t really need to keep the state data between instance stops and starts, because Flink will have to restore from a checkpoint or savepoint, so using ephemeral isn’t a problem. Sent from my iPhone > On Oct 17, 2019, at 11:31

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-16 Thread Steven Nelson
you know step by step process to reproduce this problem? > > -Ravi > > > On Wed 16 Oct, 2019, 17:40 Steven Nelson, > wrote: > >> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2. >> >> About half my shards start over at trim horizon. Why would some shard

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-16 Thread Steven Nelson
the beginning. Please refer to the implementation here: [1] >> >> [1] >> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 >

Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Steven Nelson
Hello, we currently use Flink 1.9.0 with Kinesis to process data. We have extended data retention on the Kinesis stream, which gives us 7 days of data. We have found that when a savepoint/checkpoint is restored that it appears to be restarting the Kinesis Consumer from the start of the stream.

Re: Where are uploaded Job jars stored?

2019-10-10 Thread Steven Nelson
John, I think you are referring to the web upload directory. There is a setting for that folder ‘web.upload.dir’. If you set that to a folder writeable to both masters it will work as desired. I used an NFS mount (AWS EFS). -Steve Sent from my iPhone > On Oct 10, 2019, at 10:11 PM, Zhu Zhu

Re: flink 1.9

2019-10-08 Thread Steven Nelson
https://flink.apache.org/downloads.html#apache-flink-190 Sent from my iPhone > On Oct 8, 2019, at 3:47 PM, Vishal Santoshi wrote: > > where do I get the corresponding jar for 1.9 ? > > flink-shaded-hadoop2-uber-2.7.5-1.8.0.jar > > Thanks..

Re: Problem with savepoint deserialization

2019-10-08 Thread Steven Nelson
> would handle a missing Option. > >> On Oct 8, 2019, at 11:38 AM, Steven Nelson wrote: >> >> Are you sure? I just restarted the job with new new version, but not from a >> savepoint and took a new savepoint and it seemed to work from there. It just >> s

Re: Problem with savepoint deserialization

2019-10-08 Thread Steven Nelson
wrote: > > The Option class is not serializable, if you put something serializable into > that case class you wouldn’t have problems. > >> On Oct 8, 2019, at 8:17 AM, Steven Nelson wrote: >> >> Hello! We are working with a Scala based pipeline. >> >>

Problem with savepoint deserialization

2019-10-08 Thread Steven Nelson
Hello! We are working with a Scala based pipeline. We changed case class Record(orgId: Int) To case class Record(orgId: Int, operationId:Option[String] = None) And now our savepoints fail with this exception: org.apache.flink.util.StateMigrationException: The new state serializer cannot be

Debugging slow/failing checkpoints

2019-09-26 Thread Steven Nelson
I am working with an application that hasn't gone to production yet. We run Flink as a cluster within a K8s environment. It has the following attributes 1) 2 Job Manager configured using HA, backed by Zookeeper and HDFS 2) 4 Task Managers 3) Configured to use RocksDB. The actual RocksDB files are

Multiple Job Managers in Flink HA Setup

2019-09-20 Thread Steven Nelson
Hello! I am having some difficulty with multiple job managers in an HA setup using Flink 1.9.0. I have 2 job managers and have setup the HA setup with the following config high-availability: zookeeper high-availability.cluster-id: /imet-enhance high-availability.storageDir: hdfs:///flink/ha/

Window Function that releases when downstream work is completed

2019-08-16 Thread Steven Nelson
Hello! I think I know the answer to this, but I thought I would go ahead and ask. We have a process the emits messages to our stream. These messages can include duplicates based on a certain key ( we'll call it TheKey). Our Flink job reads the messages, keys by TheKey and enters a window

Re: Understanding job flow

2019-08-15 Thread Steven Nelson
The encryptor will be serialized and sent with the rest of your Job Graph when the job is submitted. If it’s not serializable you get an error. Sent from my iPhone > On Aug 15, 2019, at 11:00 AM, Vishwas Siravara wrote: > > Hi guys, > I have a map job where I want to encrypt certain keys . I

Issue starting Flink job with with Avro class

2019-07-11 Thread Steven Nelson
Hello! We are working on a Flink application and came across this error. The "Record" class is a class generated from an Avro Schema. It's actually used by a second "Operation" class which doesn't seem to have this problem. Has anyone seen this before?

Re: Flink error handling

2019-07-03 Thread Steven Nelson
We ended up using side outputs for now and basically implementing our own map/flatMap that internally uses a ProcessFunction. Sent from my iPhone > On Jul 3, 2019, at 6:02 AM, Halfon, Roey wrote: > > Hi, > Do you have any progress with that? > > -Original Message--

Flink error handling

2019-06-18 Thread Steven Nelson
Hello! We are internally having a debate on how best to handle exceptions within our operators. Some advocate for wrapping maps/flatMaps inside a processfunction and sending the error to a side output. Other options are returning a custom Either that gets filtered and mapped into different

Connectors (specifically Kinesis Connector)

2019-05-21 Thread Steven Nelson
Hello! We keep having difficulties with the Kinesis connector. We have to publish our own version, and we understand why. What I am curious about is the plan to make this better in the future. Is there an issue/FLIP that I can reference when talking internally about this? -Steve

Issue with job crashing due to KinesisProducer

2019-05-17 Thread Steven Nelson
Hello! We are running a simple job on a Flink 1.7.2 cluster that reads from one kinesis stream, de-duplicates some values and writes to another stream. We made some changes to use IngestionTime and added a custom AutoWatermarker to emit watermarks in case nothing comes in on the stream after a

Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment

2019-05-08 Thread Steven Nelson
minate the job before a certain time > (processing time) has passed. You could do this by adding a sleep to your > source function after you've output all records and just before leaving the > source loop. > > Cheers, > Till > >> On Tue, May 7, 2019 at 11:49 PM St

Testing with ProcessingTime and FromElementsFunction with TestEnvironment

2019-05-07 Thread Steven Nelson
Hello! I am trying to write a test that runs in the TestEnviroment. I create a process that uses ProcessingTime, has a source constructed from a FromElementsFunction and runs data through a Keyed Stream into a ProcessingTimeSessionWindows.withGap(). The problem is that it appears that the

Job Startup Arguments

2019-04-24 Thread Steven Nelson
Hello! Is there a way (via the REST API) to see the parameters used to start a job? -Steve

Flink CLI

2019-04-24 Thread Steven Nelson
Hello! I am working on automating our deployments to our Flink cluster. I had a couple questions about the flink cli. 1) I thought there was an "update" command that would internally manage the cancel with savepoint, upload new jar, restart from savepoint process. 2) Is there a way to get the

Log Management

2019-04-16 Thread Steven Nelson
Hello! I am working on logging for our Flink/Kubernetes infrastructure to our external corporate ElasticSearch cluster. I have a few ideas to explore and wondered if anyone had any feedback/experience to share. Ideas I am exploring right now: 1) Add a K8s configmap that contains an updated log4j

Re: S3 Bucket Source

2019-04-15 Thread Steven Nelson
passed in the constructor (and will be serialized > out to all the parallel instances). > > > > On Mon, Apr 15, 2019 at 9:16 AM Steven Nelson > wrote: > >> I am working on a process to do some compaction of files in S3. I read a >> bucket full of files key them, pul

S3 Bucket Source

2019-04-15 Thread Steven Nelson
I am working on a process to do some compaction of files in S3. I read a bucket full of files key them, pull them all into a window, then remove older versions of the file. The files are not organized inside the bucket, they are simply name by guid. I can iterate them using a custom Source that

Flink Kinesis Consumer

2019-02-28 Thread Steven Nelson
Hello! Does anyone know if the Flink Kinesis Consumer supports stopping rather than cancelling? I don't see that it implements StoppableFunction, but I might be wrong. -Steve

Re: HA HDFS

2019-02-12 Thread Steven Nelson
obManager pods > should both be controlled by a Deployment, so that they are brought up again > after a fault. > > Hope this helps, and please let me know if run into any issues. > > Best, > > Konstantin > > >> On Wed, Feb 6, 2019 at 7:47 PM Steven Nelson &

HA HDFS

2019-02-06 Thread Steven Nelson
I am working on a POC High Availability installation of Flink on top of Kubernetes with HDFS as a data storage location. I am not finding much documentation on doing this, or I am finding the documentation in parts and maybe getting it put together correctly. I think it falls between being an HDFS

Problem with metrics inside Kubernetes

2019-01-02 Thread Steven Nelson
I have been working with Flink under Kubernetes recently and I have run into some problems with metrics. I think I have it figured out though. It appears that it's trying to use hostname resolution for the jobmanagers. This causes this error: Association with remote system

Flink HA setup on Kubernetes

2018-12-31 Thread Steven Nelson
Hello! I am trying to setup Flink in an HA mode on Kubernetes. I have an existing Zookeeper cluster that is coming from my HDFS setup. I set the nodes up in a StatefulSet so I can get exactly 3 masters that have consistent names. The first node launches okay and I can see it when I view the port.

Re: HA with HDFS question

2018-12-31 Thread Steven Nelson
op.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html > > > On 21 Dec 2018, at 21:48, Steven Nelson > wrote: > > > > First off, I am new to using HDFS to store things, so expect stupid > questions. > > > > I am working on hard

HA with HDFS question

2018-12-21 Thread Steven Nelson
First off, I am new to using HDFS to store things, so expect stupid questions. I am working on hardening our Flink cluster for production usage. This includes setting up an HA flink cluster, saving checkpoint and savepoints to a central location etc. I have a functioning HDFS setup inside an HA

Re: getting an error when configuring state backend to hdfs

2018-12-19 Thread Steven Nelson
What image are you using? Sent from my iPhone > On Dec 19, 2018, at 9:44 AM, Avi Levi wrote: > > Hi Chesnay, > What do you mean? I am creating a fat jar with all dependencies (using sbt > assembly). which jar I should place in the /lib directory ? > >> On Wed, Dec 19, 2018 at 4:44 PM Chesnay

Re: Can't list logs or stdout through web console on Flink 1.7 Kubernetes

2018-12-19 Thread Steven Nelson
There is a known issue for this I believe. The problem is that the containerized versions of Flink output logs to STDOUT instead of files inside the node. If you pull use docker logs on the container you can see what you’re looking for. I use the Kube dashboard to view the logs centrally. Sent

Setting up FsStateBackend with hdfs storage location

2018-12-13 Thread Steven Nelson
Hello! I am working on setting up a new flink cluster that stores it's checkpoints in an HDFS cluster deployed to the same Kubernetes cluster. I am running into problems with the dependencies required to use the hdfs:// storage location. The exception I am getting is Caused by: