I am trying to add some custom metrics to my window (because the window is
causing a lot of backpressure). However I can't seem to use a
RichAggregationFunction instead of an AggregationFunction. I am trying to
see how long things get held in our EventTimeSessionWindows.withGap window.
Is there ano
We are working with a process and having some problems with backpressure.
The backpressure seems to be caused by a simple Window operation, which
causes our checkpoints to fail.
What would be the recommendations for debugging the backpressure?
We recently migrated to Flink 1.10, but are experiencing some issues with
memory.
Our cluster is:
1) Running inside of Kubernetes
2) Running in HA mode
3) Checkpointing/Savepointing to an HDFS cluster inside of Kubernetes
4) Using RocksDB for checkpointing
5) Running on m5d.4xlarge EC2 instances w
You may want to look at using instances with local ssd drives. You don’t really
need to keep the state data between instance stops and starts, because Flink
will have to restore from a checkpoint or savepoint, so using ephemeral isn’t a
problem.
Sent from my iPhone
> On Oct 17, 2019, at 11:31
> wrote:
> Do you know step by step process to reproduce this problem?
>
> -Ravi
>
>
> On Wed 16 Oct, 2019, 17:40 Steven Nelson,
> wrote:
>
>> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>>
>> About half my shards start over at trim horizon.
from the beginning. Please refer to the implementation here: [1]
>>
>> [1]
>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
Hello, we currently use Flink 1.9.0 with Kinesis to process data.
We have extended data retention on the Kinesis stream, which gives us 7
days of data.
We have found that when a savepoint/checkpoint is restored that it appears
to be restarting the Kinesis Consumer from the start of the stream.
Th
John,
I think you are referring to the web upload directory. There is a setting for
that folder ‘web.upload.dir’. If you set that to a folder writeable to both
masters it will work as desired. I used an NFS mount (AWS EFS).
-Steve
Sent from my iPhone
> On Oct 10, 2019, at 10:11 PM, Zhu Zhu
https://flink.apache.org/downloads.html#apache-flink-190
Sent from my iPhone
> On Oct 8, 2019, at 3:47 PM, Vishal Santoshi wrote:
>
> where do I get the corresponding jar for 1.9 ?
>
> flink-shaded-hadoop2-uber-2.7.5-1.8.0.jar
>
> Thanks..
> would handle a missing Option.
>
>> On Oct 8, 2019, at 11:38 AM, Steven Nelson wrote:
>>
>> Are you sure? I just restarted the job with new new version, but not from a
>> savepoint and took a new savepoint and it seemed to work from there. It just
>> s
wrote:
>
> The Option class is not serializable, if you put something serializable into
> that case class you wouldn’t have problems.
>
>> On Oct 8, 2019, at 8:17 AM, Steven Nelson wrote:
>>
>> Hello! We are working with a Scala based pipeline.
>>
>>
Hello! We are working with a Scala based pipeline.
We changed
case class Record(orgId: Int)
To
case class Record(orgId: Int, operationId:Option[String] = None)
And now our savepoints fail with this exception:
org.apache.flink.util.StateMigrationException: The new state serializer
cannot be inc
I am working with an application that hasn't gone to production yet. We run
Flink as a cluster within a K8s environment. It has the following attributes
1) 2 Job Manager configured using HA, backed by Zookeeper and HDFS
2) 4 Task Managers
3) Configured to use RocksDB. The actual RocksDB files are
Hello!
I am having some difficulty with multiple job managers in an HA setup using
Flink 1.9.0.
I have 2 job managers and have setup the HA setup with the following config
high-availability: zookeeper
high-availability.cluster-id: /imet-enhance
high-availability.storageDir: hdfs:///flink/ha/
hig
I am trying to update a cluster running in HA mode from 1.7.2 to 1.9.0. I
am attempting to just update the docker images to the new ones and restart
the cluster. Is this something that is supported? or do I need to destroy
the HA setup and build the cluster from scratch?
Here is the error I get.
2
Hello!
I think I know the answer to this, but I thought I would go ahead and ask.
We have a process the emits messages to our stream. These messages can
include duplicates based on a certain key ( we'll call it TheKey). Our
Flink job reads the messages, keys by TheKey and enters a window function
The encryptor will be serialized and sent with the rest of your Job Graph when
the job is submitted. If it’s not serializable you get an error.
Sent from my iPhone
> On Aug 15, 2019, at 11:00 AM, Vishwas Siravara wrote:
>
> Hi guys,
> I have a map job where I want to encrypt certain keys . I
Hello! We are working on a Flink application and came across this
error. The "Record" class is a class generated from an Avro Schema.
It's actually used by a second "Operation" class which doesn't seem to
have this problem. Has anyone seen this before?
org.apache.flink.streaming.runtime.tasks.Str
We ended up using side outputs for now and basically implementing our own
map/flatMap that internally uses a ProcessFunction.
Sent from my iPhone
> On Jul 3, 2019, at 6:02 AM, Halfon, Roey wrote:
>
> Hi,
> Do you have any progress with that?
>
> -Original Message--
Hello!
We are internally having a debate on how best to handle exceptions within our
operators. Some advocate for wrapping maps/flatMaps inside a processfunction
and sending the error to a side output. Other options are returning a custom
Either that gets filtered and mapped into different si
Hello!
We keep having difficulties with the Kinesis connector. We have to publish
our own version, and we understand why. What I am curious about is the plan
to make this better in the future. Is there an issue/FLIP that I can
reference when talking internally about this?
-Steve
Hello!
We are running a simple job on a Flink 1.7.2 cluster that reads from one
kinesis stream, de-duplicates some values and writes to another stream. We
made some changes to use IngestionTime and added a custom AutoWatermarker
to emit watermarks in case nothing comes in on the stream after a per
t terminate the job before a certain time
> (processing time) has passed. You could do this by adding a sleep to your
> source function after you've output all records and just before leaving the
> source loop.
>
> Cheers,
> Till
>
>> On Tue, May 7, 2019 at 11:4
Hello!
I am trying to write a test that runs in the TestEnviroment. I create a
process that uses ProcessingTime, has a source constructed from a
FromElementsFunction and runs data through a Keyed Stream into
a ProcessingTimeSessionWindows.withGap().
The problem is that it appears that the env.exe
Hello!
Is there a way (via the REST API) to see the parameters used to start a job?
-Steve
Hello!
I am working on automating our deployments to our Flink cluster. I had a
couple questions about the flink cli.
1) I thought there was an "update" command that would internally manage the
cancel with savepoint, upload new jar, restart from savepoint process.
2) Is there a way to get the Fl
Hello!
I am working on logging for our Flink/Kubernetes infrastructure to our
external corporate ElasticSearch cluster. I have a few ideas to explore and
wondered if anyone had any feedback/experience to share.
Ideas I am exploring right now:
1) Add a K8s configmap that contains an updated log4j
sed in the constructor (and will be serialized
> out to all the parallel instances).
>
>
>
> On Mon, Apr 15, 2019 at 9:16 AM Steven Nelson
> wrote:
>
>> I am working on a process to do some compaction of files in S3. I read a
>> bucket full of files key them, pul
I am working on a process to do some compaction of files in S3. I read a
bucket full of files key them, pull them all into a window, then remove
older versions of the file. The files are not organized inside the bucket,
they are simply name by guid. I can iterate them using a custom Source that
jus
Hello!
Does anyone know if the Flink Kinesis Consumer supports stopping rather
than cancelling? I don't see that it implements StoppableFunction, but I
might be wrong.
-Steve
and JobManager pods
> should both be controlled by a Deployment, so that they are brought up again
> after a fault.
>
> Hope this helps, and please let me know if run into any issues.
>
> Best,
>
> Konstantin
>
>
>> On Wed, Feb 6, 2019 at 7:47 PM Steven
I am working on a POC High Availability installation of Flink on top of
Kubernetes with HDFS as a data storage location. I am not finding much
documentation on doing this, or I am finding the documentation in parts and
maybe getting it put together correctly. I think it falls between being an
HDFS
I have been working with Flink under Kubernetes recently and I have run
into some problems with metrics. I think I have it figured out though. It
appears that it's trying to use hostname resolution for the jobmanagers.
This causes this error:
Association with remote system
[akka.tcp://flink@flink-
Hello! I am trying to setup Flink in an HA mode on Kubernetes. I have an
existing Zookeeper cluster that is coming from my HDFS setup. I set the
nodes up in a StatefulSet so I can get exactly 3 masters that have
consistent names. The first node launches okay and I can see it when I view
the port. H
op.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
>
> > On 21 Dec 2018, at 21:48, Steven Nelson
> wrote:
> >
> > First off, I am new to using HDFS to store things, so expect stupid
> questions.
> >
> > I am working on hard
First off, I am new to using HDFS to store things, so expect stupid
questions.
I am working on hardening our Flink cluster for production usage. This
includes setting up an HA flink cluster, saving checkpoint and savepoints
to a central location etc. I have a functioning HDFS setup inside an HA
Ku
What image are you using?
Sent from my iPhone
> On Dec 19, 2018, at 9:44 AM, Avi Levi wrote:
>
> Hi Chesnay,
> What do you mean? I am creating a fat jar with all dependencies (using sbt
> assembly). which jar I should place in the /lib directory ?
>
>> On Wed, Dec 19, 2018 at 4:44 PM Chesnay
There is a known issue for this I believe. The problem is that the
containerized versions of Flink output logs to STDOUT instead of files inside
the node. If you pull use docker logs on the container you can see what you’re
looking for. I use the Kube dashboard to view the logs centrally.
Sent
Hello!
I am working on setting up a new flink cluster that stores it's checkpoints
in an HDFS cluster deployed to the same Kubernetes cluster.
I am running into problems with the dependencies required to use the
hdfs:// storage location.
The exception I am getting is
Caused by: org.apache.flink
39 matches
Mail list logo