Securing Stateful Functions

2021-10-15 Thread mark
Hello,
My team needs to apply (currently self-signed) HTTPS certificates to our
Stateful Function endpoints. Is this currently possible? I can't find any
examples in the playground, or information in the online documentation.

We are using Remote Stateful Functions and a custom, Java-based
`org.apache.flink.statefun.sdk.io.Router`.
We need help please in knowing how to set up the Router's client connection
with the necessary client-side certificates.

Very many thanks in advance!

Regards,
Mark


Help with completing Flink SQL please!

2022-01-20 Thread mark
Hello,
I'm writing to ask for help with generating completion hints for Flink SQL.
I'm trying to use the Calcite SqlAdisor with the Flink parser. My problem
is that I can get completion working for table names, but not column names.

  "select a.mgr from ^stuff a"

gives me good results:  CATALOG.SALES.EMP, CATALOG.SALES, etc, but

  "select a.^ from sales.emp a"

gives me only "*". See
https://github.com/mnuttall/sql-testing/blob/main/src/main/java/test/AdvisorBuilder.java
for
how I'm constructing my SqlAdvisor and
https://github.com/mnuttall/sql-testing/blob/main/src/test/java/test/TestCompletion.java
for
some simple test code.

Should it be possible to get completion for column names in my second
example? Can anyone please point me to a working example in test code, or
spot what I've done wrong in my test code? Very many thanks in advance for
any help anyone can offer.

Regards,

Mark


subscribe

2023-03-15 Thread mark
subscribe


IntervalJoin invisibly becomes a regular Join - why?

2023-03-15 Thread mark
Hello,
I'm seeing some strange behaviour in Flink SQL where adding a new SELECT
statement causes a previously created Interval Join to be changed into a
regular Join. I'm concerned because the Flink docs make clear that regular
Joins are not safe because their memory usage can grow indefinitely.

I have put a worked example in https://github.com/mnuttall/flink-debug. I
have an interval join,

CREATE TEMPORARY VIEW suspiciousOrders AS
SELECT s.orderId, s.customer, s.product, s.quantity AS order_quantity,
l.cancel_quantity, l.order_ts AS large_ts, s.ts as small_ts, l.cancel_ts
FROM smallOrders s JOIN largeCancellations l
ON s.product = l.product AND s.customer = l.customer
WHERE s.ts BETWEEN l.cancel_ts - interval '1' day AND l.cancel_ts;

which evaluates to

[13]:IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-8640, leftUpperBound=0, leftTimeIndex=0,
rightTimeIndex=1], where=[((product = product0) AND (customer = customer0)
AND (ts >= (cancel_ts - 8640:INTERVAL DAY)) AND (ts <=
cancel_ts))], select=[ts, orderId, customer, product, quantity, order_ts,
cancel_ts, product0, customer0, cancel_quantity])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
   +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
  +- Sink: Collect table sink

but adding a further temporary view

CREATE TEMPORARY VIEW filteredResults AS
SELECT * from suspiciousOrders WHERE small_ts > large_ts;

changes the interval join to a regular join,

 [13]:Join(joinType=[InnerJoin], where=[((product = product0) AND (customer
= customer0) AND (ts >= (cancel_ts - 8640:INTERVAL DAY)) AND (ts
<= cancel_ts) AND (ts > order_ts))], select=[ts, orderId, customer,
product, quantity, order_ts, cancel_ts, product0, customer0,
cancel_quantity], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+- [14]:Calc(select=[orderId, customer, product, quantity AS
order_quantity, cancel_quantity, order_ts AS large_ts, ts AS small_ts,
cancel_ts])
   +- [15]:ConstraintEnforcer[NotNullEnforcer(fields=[order_quantity,
cancel_quantity])]
  +- Sink: Collect table sink

Please can someone explain what's happening here? It looks as though my
(safe) interval join is being converted to an (unsafe) regular join - is
that true?

Many thanks in advance.
Regards,

Mark Nuttall


Re: IntervalJoin invisibly becomes a regular Join - why?

2023-03-16 Thread mark
Is there anything that an author can do to prevent a well-formed Interval
Join from being 'optimised' into a regular, non-interval join by later
queries? Does the Flink runtime make any guarantees that when it alters an
interval join into a regular join, that the safety of the original interval
join is retained?

Many thanks in advance for any insight that anyone can offer.

Regards,
Mark

On Wed, 15 Mar 2023 at 16:02, Leonard Xu  wrote:

> >
> > CREATE TEMPORARY VIEW filteredResults AS
> > SELECT * from suspiciousOrders WHERE small_ts > large_ts;
>
> Looks like after added the condition, the final expanded query should not
> match the condition[1] of an interval join that leads to the planner
> recognize it as an interval join. It’s not a bug, interval join is a
> special case of regular join, thus the result would be still correct.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/joining/#interval-join


Anomalous spikes in aggregations of keyed data

2020-11-30 Thread Kegel, Mark
We have a high volume (600-700 shards) kinesis data stream that we are doing a 
simple keying and aggregation on. The logic is very simple: kinesis source, key 
by fields (A,B,C), window (1-minute, tumbling), aggregate by summing over 
integer field R, connect to sink.

We are seeing some anomalous spikes in our aggregations. From one minute to the 
next, the sum total for one particular key may increase 25x or more and then 
drop back down to a normal level, yet sums for other keys in the same window 
remain roughly the same, which we expect.

We don’t see this too often. Maybe 1-5 data points (key + timestamp) in an 
hour’s worth of 1-minute windowed data will have these spikes. The data has 
fairly low cardinality. There are only roughly two hundred distinct keys.

We inspected the raw kinesis stream and found no duplicates. It isn’t clear how 
these spikes could happen or what we might do to work around the issue since 
the code is as idiomatic as possible.

We are running the job as part of Kinesis Data Analytics, which is using Flink 
version 1.8. To connect to Kinesis we are using the 
amazon-kinesis-connection-flink library (v1.0.4) library and the EFO consumer 
mode.



Re: Anomalous spikes in aggregations of keyed data

2020-11-30 Thread Kegel, Mark
At the moment we checkpoint every minute. I can turn this frequency down but 
I’m not sure that will fix/hide the issue.

Mark

From: Arvid Heise 
Date: Monday, November 30, 2020 at 2:33 PM
To: Kegel, Mark 
Cc: user@flink.apache.org 
Subject: Re: Anomalous spikes in aggregations of keyed data
Hi Mark,

could you double check if these spikes co-occur with checkpointing? If there is 
an alignment, certain channels are blocked from taking in data. If all keys are 
more or less contained in a shard with less data, it would why only these keys 
are affected.

On Mon, Nov 30, 2020 at 9:27 PM Kegel, Mark 
mailto:mark.ke...@disneystreaming.com>> wrote:
We have a high volume (600-700 shards) kinesis data stream that we are doing a 
simple keying and aggregation on. The logic is very simple: kinesis source, key 
by fields (A,B,C), window (1-minute, tumbling), aggregate by summing over 
integer field R, connect to sink.

We are seeing some anomalous spikes in our aggregations. From one minute to the 
next, the sum total for one particular key may increase 25x or more and then 
drop back down to a normal level, yet sums for other keys in the same window 
remain roughly the same, which we expect.

We don’t see this too often. Maybe 1-5 data points (key + timestamp) in an 
hour’s worth of 1-minute windowed data will have these spikes. The data has 
fairly low cardinality. There are only roughly two hundred distinct keys.

We inspected the raw kinesis stream and found no duplicates. It isn’t clear how 
these spikes could happen or what we might do to work around the issue since 
the code is as idiomatic as possible.

We are running the job as part of Kinesis Data Analytics, which is using Flink 
version 1.8. To connect to Kinesis we are using the 
amazon-kinesis-connection-flink library (v1.0.4) library and the EFO consumer 
mode.



--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2F&data=04%7C01%7CMark.Kegel%40disneystreaming.com%7C123c5008070c449a738908d8956f3c4d%7C65f03ca86d0a493e9e4ac85ac9526a03%7C1%7C0%7C637423652270107199%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=gduDFr1FnDlxFuNR0Y2IuDe3PdB%2FzMDqbGeqyCQ8PfQ%3D&reserved=0>


Follow us @VervericaData

--

Join Flink 
Forward<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fflink-forward.org%2F&data=04%7C01%7CMark.Kegel%40disneystreaming.com%7C123c5008070c449a738908d8956f3c4d%7C65f03ca86d0a493e9e4ac85ac9526a03%7C1%7C0%7C637423652270117197%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=sK2pfROr0iYsdxoH%2FzEoPXyc3tzN%2BA1jgZhUdRA60f4%3D&reserved=0>
 - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng


Resource changed on src filesystem after upgrade

2021-01-17 Thread Mark Davis
Hi all,
I am upgrading my DataSet jobs from Flink 1.8 to 1.12.
After the upgrade I started to receive the errors like this one:

14:12:57,441 INFO 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager - Worker 
container_e120_1608377880203_0751_01_000112 is terminated. Diagnostics: 
Resource 
hdfs://bigdata/user/hadoop/.flink/application_1608377880203_0751/jobs.jar 
changed on src filesystem (expected 1610892446439, was 1610892446971
java.io.IOException: Resourceh 
dfs://bigdata/user/hadoop/.flink/application_1608377880203_0751/jobs.jar 
changed on src filesystem (expected 1610892446439, was 1610892446971
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:257)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:228)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:221)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:209)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I understand it is somehow related to FLINK-12195, but this time it comes from 
the Hadoop code. I am running a very old version of the HDP platform v.2.6.5 so 
it might be the one to blame.
But the code was working perfectly fine before the upgrade, so I am confused.
Could you please advise.

Thank you!
Mark

Re: Resource changed on src filesystem after upgrade

2021-01-29 Thread Mark Davis
Hi Xintong Song,

> - Does this error happen for every of your dataset jobs? For a problematic 
> job, does it happen for every container?
> - What is the `jobs.jar`? Is it under `lib/`, `opt` of your client side 
> filesystem, or specified as `yarn.ship-files`, `yarn.ship-archives` or 
> `yarn.provided.lib.dirs`? This helps us to locate the code path that this 
> file went through.

I finally found the cause for the problem - I set both yarn.flink-dist-jar and 
pipeline.jars to the same archive (I submit jobs programmatically and repackage 
the Flink distribution because flink-dist.jar is not in the Central).
If I copy the file and refer jobs and distribution jars under different names 
the problem disappears.

My guess is that YARN (YarnApplicationFileUploader?) copies both files and if 
the filenames are the same the first file is overwritten by the second one and 
thus there is a a timestamp difference.

I guess a lot has changed since 1.8 in the YARN deployment area. Too bad there 
is no clear instruction how to submit a job programmatically every time I have 
to reverse engineer CliFrontend.

Sorry for the confusion and thanks!

Mark

Run command after Batch is finished

2020-06-05 Thread Mark Davis
Hi there,

I am running a Batch job with several outputs.
Is there a way to run some code(e.g. release a distributed lock) after all 
outputs are finished?

Currently I do this in a try-finally block around 
ExecutionEnvironment.execute() call, but I have to switch to the detached 
execution mode - in this mode the finally block is never run.

Thank you!

  Mark

Re: Run command after Batch is finished

2020-06-06 Thread Mark Davis
Hi Jeff,

Thank you very much! That is exactly what I need.

Where the listener code will run in the cluster deployment(YARN, k8s)?
Will it be sent over the network?

Thank you!

  Mark

‐‐‐ Original Message ‐‐‐
On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:

> You can try JobListener which you can register to ExecutionEnvironment.
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>
> Mark Davis  于2020年6月6日周六 上午12:00写道:
>
>> Hi there,
>>
>> I am running a Batch job with several outputs.
>> Is there a way to run some code(e.g. release a distributed lock) after all 
>> outputs are finished?
>>
>> Currently I do this in a try-finally block around 
>> ExecutionEnvironment.execute() call, but I have to switch to the detached 
>> execution mode - in this mode the finally block is never run.
>>
>> Thank you!
>>
>>   Mark
>
> --
> Best Regards
>
> Jeff Zhang

Re: Run command after Batch is finished

2020-06-07 Thread Mark Davis
Hi Jeff,

Unfortunately this is not good enough for me.
My clients are very volatile, they start a batch and can go away any moment 
without waiting for it to finish. Think of an elastic web application or an AWS 
Lambda.

I hopped to find something what could be deployed to the cluster together with 
the batch code. Maybe a hook to a job manager or similar. I do not plan to run 
anything heavy there, just some formal cleanups.
Is there something like this?

Thank you!

  Mark

‐‐‐ Original Message ‐‐‐
On Saturday, June 6, 2020 4:29 PM, Jeff Zhang  wrote:

> It would run in the client side where ExecutionEnvironment is created.
>
> Mark Davis  于2020年6月6日周六 下午8:14写道:
>
>> Hi Jeff,
>>
>> Thank you very much! That is exactly what I need.
>>
>> Where the listener code will run in the cluster deployment(YARN, k8s)?
>> Will it be sent over the network?
>>
>> Thank you!
>>
>>   Mark
>>
>> ‐‐‐ Original Message ‐‐‐
>> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>>
>>> You can try JobListener which you can register to ExecutionEnvironment.
>>>
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>>>
>>> Mark Davis  于2020年6月6日周六 上午12:00写道:
>>>
>>>> Hi there,
>>>>
>>>> I am running a Batch job with several outputs.
>>>> Is there a way to run some code(e.g. release a distributed lock) after all 
>>>> outputs are finished?
>>>>
>>>> Currently I do this in a try-finally block around 
>>>> ExecutionEnvironment.execute() call, but I have to switch to the detached 
>>>> execution mode - in this mode the finally block is never run.
>>>>
>>>> Thank you!
>>>>
>>>>   Mark
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>
> --
> Best Regards
>
> Jeff Zhang

Re: Run command after Batch is finished

2020-06-09 Thread Mark Davis
Hi Chesnay,

That is an interesting proposal, thank you!
I was doing something similar with the OutputFormat#close() method respecting 
the Format's parallelism. Using FinalizeOnMaster will make things easier.
But the problem is that several OutputFormats must be synchronized externally - 
every output must check whether other outputs finished already... Quite 
cumbersome.
Also there is a problem with exceptions - the OutputFormats can be never open 
and never closed.

  Mark

‐‐‐ Original Message ‐‐‐
On Monday, June 8, 2020 5:50 PM, Chesnay Schepler  wrote:

> This goes in the right direction; have a look at 
> org.apache.flink.api.common.io.FinalizeOnMaster, which an OutputFormat can 
> implement to run something on the Master after all subtasks have been closed.
>
> On 08/06/2020 17:25, Andrey Zagrebin wrote:
>
>> Hi Mark,
>>
>> I do not know how you output the results in your pipeline.
>> If you use DataSet#output(OutputFormat outputFormat), you could try to 
>> extend the format with a custom close method which should be called once the 
>> task of the sink batch operator is done in the task manager.
>> I also cc Aljoscha, maybe, he has more ideas.
>>
>> Best,
>> Andrey
>>
>> On Sun, Jun 7, 2020 at 1:35 PM Mark Davis  wrote:
>>
>>> Hi Jeff,
>>>
>>> Unfortunately this is not good enough for me.
>>> My clients are very volatile, they start a batch and can go away any moment 
>>> without waiting for it to finish. Think of an elastic web application or an 
>>> AWS Lambda.
>>>
>>> I hopped to find something what could be deployed to the cluster together 
>>> with the batch code. Maybe a hook to a job manager or similar. I do not 
>>> plan to run anything heavy there, just some formal cleanups.
>>> Is there something like this?
>>>
>>> Thank you!
>>>
>>>   Mark
>>>
>>> ‐‐‐ Original Message ‐‐‐
>>> On Saturday, June 6, 2020 4:29 PM, Jeff Zhang  wrote:
>>>
>>>> It would run in the client side where ExecutionEnvironment is created.
>>>>
>>>> Mark Davis  于2020年6月6日周六 下午8:14写道:
>>>>
>>>>> Hi Jeff,
>>>>>
>>>>> Thank you very much! That is exactly what I need.
>>>>>
>>>>> Where the listener code will run in the cluster deployment(YARN, k8s)?
>>>>> Will it be sent over the network?
>>>>>
>>>>> Thank you!
>>>>>
>>>>>   Mark
>>>>>
>>>>> ‐‐‐ Original Message ‐‐‐
>>>>> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>>>>>
>>>>>> You can try JobListener which you can register to ExecutionEnvironment.
>>>>>>
>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>>>>>>
>>>>>> Mark Davis  于2020年6月6日周六 上午12:00写道:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> I am running a Batch job with several outputs.
>>>>>>> Is there a way to run some code(e.g. release a distributed lock) after 
>>>>>>> all outputs are finished?
>>>>>>>
>>>>>>> Currently I do this in a try-finally block around 
>>>>>>> ExecutionEnvironment.execute() call, but I have to switch to the 
>>>>>>> detached execution mode - in this mode the finally block is never run.
>>>>>>>
>>>>>>> Thank you!
>>>>>>>
>>>>>>>   Mark
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang

Fwd: Asynchronous I/O poor performance

2020-07-04 Thread Mark Zitnik
Hi

In my flink application I need to enrich data using
AsyncDataStream.unorderedWait
but I am getting poor perforce at the beginning I was just working with
http call, but I have switched to grpc, I running on 8 core node and
getting total of 3200 events per second my service that I am using is not
fully utilized and can produce up to 1 req/seq

Flink job flow
Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write to
Kafka

Using Akkad grpc code written in scala

Thanks


Re: Asynchronous I/O poor performance

2020-07-05 Thread Mark Zitnik
Hi Benchao

The capacity is 100
Parallelism is 8
Rpc req is 20ms

Thanks


On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:

> Hi Mark,
>
> Could you give more details about your Flink job?
> - the capacity of AsyncDataStream
> - the parallelism of AsyncDataStream operator
> - the time of per blocked rpc request
>
> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>
>> Hi
>>
>> In my flink application I need to enrich data using 
>> AsyncDataStream.unorderedWait
>> but I am getting poor perforce at the beginning I was just working with
>> http call, but I have switched to grpc, I running on 8 core node and
>> getting total of 3200 events per second my service that I am using is not
>> fully utilized and can produce up to 1 req/seq
>>
>> Flink job flow
>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>> to Kafka
>>
>> Using Akkad grpc code written in scala
>>
>> Thanks
>>
>
>
> --
>
> Best,
> Benchao Li
>


Re: Asynchronous I/O poor performance

2020-07-06 Thread Mark Zitnik
Hi Benchao,

i have run this in the code:

println(env.getConfig.getAutoWatermarkInterval)

and got 200 i do fully understand how watermarks and AsyncOperator operator
works, but
i have decided to make a simple test that should evaluate the time it takes
to enter to the asyncInvoke method  and it looks that it takes about 80ms
witch is longer than the time it take to get a response from my
micro-service

code below

class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {

  implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())

  /*
  implicit val actorSystem = ActorSystem.apply("test", None, None,
Some(executor))
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher


  println(materializer.system.name)
  println("start")
  */
// redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com

  // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
  var actorSystem: ActorSystem = null
  var materializer: ActorMaterializer = null
  var executionContext: ExecutionContextExecutor = null
  //var akkaHttp: HttpExt = null

  override def open(parameters: Configuration): Unit = {
actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString,
Some(ConfigFactory.load("application.conf")), None, Some(executor))
materializer = ActorMaterializer()(actorSystem)
executionContext = actorSystem.dispatcher
//akkaHttp = Http(actorSystem)
  }

  override def close(): Unit = {
actorSystem.terminate()
  }

  override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {
val start = str.toLong
val delta = System.currentTimeMillis() - start
resultFuture.complete(Iterable((str, s"${delta}")))
  }
}


object Job {
  def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//env.enableCheckpointing(10)
env.setParallelism(1)

val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
//someIntegers.map { _ => System.currentTimeMillis()}.map{ s =>
System.currentTimeMillis()-s}.print()
val x : DataStream[String] = someIntegers.map( _ =>
s"${System.currentTimeMillis()}")
val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L,
TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
  //AsyncDataStream.unorderedWait(data , new
AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
resultStream.print()
println(env.getConfig.getAutoWatermarkInterval)
env.execute("Flink Scala API Skeleton")
  }
}

is this normal behavior?


On Mon, Jul 6, 2020 at 2:45 PM Benchao Li  wrote:

> Hi Mark,
>
> According to your data, I think the config of AsyncOperator is OK.
> There is one more config that might affect the throughput of
> AsyncOperator, it's watermark.
> Because unordered async operator still keeps the order between watermarks,
> did you use
> event time in your job, and if yes, what's the watermark interval in your
> job?
>
> Mark Zitnik  于2020年7月5日周日 下午7:44写道:
>
>> Hi Benchao
>>
>> The capacity is 100
>> Parallelism is 8
>> Rpc req is 20ms
>>
>> Thanks
>>
>>
>> On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:
>>
>>> Hi Mark,
>>>
>>> Could you give more details about your Flink job?
>>> - the capacity of AsyncDataStream
>>> - the parallelism of AsyncDataStream operator
>>> - the time of per blocked rpc request
>>>
>>> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>>>
>>>> Hi
>>>>
>>>> In my flink application I need to enrich data using 
>>>> AsyncDataStream.unorderedWait
>>>> but I am getting poor perforce at the beginning I was just working with
>>>> http call, but I have switched to grpc, I running on 8 core node and
>>>> getting total of 3200 events per second my service that I am using is not
>>>> fully utilized and can produce up to 1 req/seq
>>>>
>>>> Flink job flow
>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>>>> to Kafka
>>>>
>>>> Using Akkad grpc code written in scala
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>


Re: Asynchronous I/O poor performance

2020-07-09 Thread Mark Zitnik
Hi Arvid,

The http client is not my buttoleneck as I said before I check the async
and I have a delay until it enters to asyncinvoke about 80 ms if some can
explained me why we have such big delay I have attached a sample code in my
previous email can some one explain the delay

Thanks

On Mon, 6 Jul 2020, 23:31 Arvid Heise,  wrote:

> Hi Mark,
>
> Async wait operators cannot be chained to sources so the messages go
> through the network stack. Thus, having some latency is normal and cannot
> be avoided. It can be tuned though, but I don't think that this is the
> issue at hand as it should mostly impact latency and affect throughput
> less. Since external I/O calls are much more heavy weight than our internal
> communication, both the drop of throughput and the increase in latency are
> usually dwarfed by the external I/O call costs.
>
> Please try to increase the thread pool for akka as written in my previous
> email and report back.
>
> On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik  wrote:
>
>> Hi Benchao,
>>
>> i have run this in the code:
>>
>> println(env.getConfig.getAutoWatermarkInterval)
>>
>> and got 200 i do fully understand how watermarks and AsyncOperator
>> operator works, but
>> i have decided to make a simple test that should evaluate the time it
>> takes to enter to the asyncInvoke method  and it looks that it takes about
>> 80ms witch is longer than the time it take to get a response from my
>> micro-service
>>
>> code below
>>
>> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, 
>> String)] {
>>
>>   implicit lazy val executor: ExecutionContext = 
>> ExecutionContext.fromExecutor(Executors.directExecutor())
>>
>>   /*
>>   implicit val actorSystem = ActorSystem.apply("test", None, None, 
>> Some(executor))
>>   implicit val materializer = ActorMaterializer()
>>   implicit val executionContext = actorSystem.dispatcher
>>
>>
>>   println(materializer.system.name)
>>   println("start")
>>   */
>> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>>
>>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>>   var actorSystem: ActorSystem = null
>>   var materializer: ActorMaterializer = null
>>   var executionContext: ExecutionContextExecutor = null
>>   //var akkaHttp: HttpExt = null
>>
>>   override def open(parameters: Configuration): Unit = {
>> actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, 
>> Some(ConfigFactory.load("application.conf")), None, Some(executor))
>> materializer = ActorMaterializer()(actorSystem)
>> executionContext = actorSystem.dispatcher
>> //akkaHttp = Http(actorSystem)
>>   }
>>
>>   override def close(): Unit = {
>> actorSystem.terminate()
>>   }
>>
>>   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, 
>> String)]): Unit = {
>> val start = str.toLong
>> val delta = System.currentTimeMillis() - start
>> resultFuture.complete(Iterable((str, s"${delta}")))
>>   }
>> }
>>
>>
>> object Job {
>>   def main(args: Array[String]): Unit = {
>> // set up the execution environment
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> //env.enableCheckpointing(10)
>> env.setParallelism(1)
>>
>> val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
>> //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => 
>> System.currentTimeMillis()-s}.print()
>> val x : DataStream[String] = someIntegers.map( _ => 
>> s"${System.currentTimeMillis()}")
>> val resultStream: DataStream[(String, String)] = 
>> AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, 
>> TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
>>   //AsyncDataStream.unorderedWait(data , new 
>> AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
>> resultStream.print()
>> println(env.getConfig.getAutoWatermarkInterval)
>> env.execute("Flink Scala API Skeleton")
>>   }
>> }
>>
>> is this normal behavior?
>>
>>
>> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li  wrote:
>>
>>> Hi Mark,
>>>
>>> According to your data, I think the config of AsyncOperator is OK.
>>> There is one more config that might affect the throughput of
>>> AsyncOperator, it's watermark.

Resource leak in DataSourceNode?

2020-08-26 Thread Mark Davis
Hi,

I am trying to investigate a problem with non-released resources in my 
application.

I have a stateful application which submits Flink DataSetjobs using code very 
similar to the code in CliFrontend.
I noticed what I am getting a lot of non-closed connections to my data store 
(HBase in my case). The connections are held by the application not the jobs 
themselves.

I am using HBaseRowDataInputFormat and it seems that HBase connections opened 
in the configure() method during the job graph creation(before the jobs is 
executed) are not closed. My search lead me to the method 
DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics) where I 
see that a format is not closed after being configured.

Is that correct? How can I overcome this issue?

My application is long running that is probably why I observe the resource 
leak. Would I spawn a new JVM to run jobs this problem would not be noticeable.

Thank you!

Cheers,
Marc

Re: Resource leak in DataSourceNode?

2020-08-30 Thread Mark Davis
Hi Robert,

Thank you for confirming that there is an issue.
I do not have a solution for it and would like to hear the committer insights 
what is wrong there.

I think there are actually two issues - the first one is the HBase InputFormat 
does not close a connection in close().
Another is DataSourceNode not calling the close() method.

Cheers,
Mark

‐‐‐ Original Message ‐‐‐
On Thursday, August 27, 2020 3:30 PM, Robert Metzger  
wrote:

> Hi Mark,
>
> Thanks a lot for your message and the good investigation! I believe you've 
> found a bug in Flink. I filed an issue for the problem: 
> https://issues.apache.org/jira/browse/FLINK-19064.
>
> Would you be interested in opening a pull request to fix this?
> Otherwise, I'm sure a committer will pick up the issue soon.
>
> I'm not aware of a simple workaround for the problem.
>
> Best,
> Robert
>
> On Wed, Aug 26, 2020 at 4:05 PM Mark Davis  wrote:
>
>> Hi,
>>
>> I am trying to investigate a problem with non-released resources in my 
>> application.
>>
>> I have a stateful application which submits Flink DataSetjobs using code 
>> very similar to the code in CliFrontend.
>> I noticed what I am getting a lot of non-closed connections to my data store 
>> (HBase in my case). The connections are held by the application not the jobs 
>> themselves.
>>
>> I am using HBaseRowDataInputFormat and it seems that HBase connections 
>> opened in the configure() method during the job graph creation(before the 
>> jobs is executed) are not closed. My search lead me to the method 
>> DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics) where 
>> I see that a format is not closed after being configured.
>>
>> Is that correct? How can I overcome this issue?
>>
>> My application is long running that is probably why I observe the resource 
>> leak. Would I spawn a new JVM to run jobs this problem would not be 
>> noticeable.
>>
>> Thank you!
>>
>> Cheers,
>> Marc

DataSet API: HBase ScannerTimeoutException and double Result processing

2019-11-23 Thread Mark Davis
Hello,

I am reading Results from an HBase table and process them with Batch API. 
Everything works fine until I receive a ScannerTimeoutException from HBase.
Maybe my transformations get stuck or a GC pause happen - hard to tell. The 
HBase Client restarts the scan and the processing continues.
Except one problem - every time I receive this Exception I observe a duplicate 
Result processing - the Result which was processed just before 
ScannerTimeoutException is thrown is processed twice.

Is this expected behavior? Should I be prepared to handle it?
And how should I handle it? Keeping track of all processed Results is not 
feasible in my case.

Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are 
set to 60 sec)

Thank you!

Best regards,
Mark

  public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.createInput(new Src())
.map(new Mapper())
.print();
  }

  private static class Mapper implements MapFunction, String> {

private int cnt = 0;

@Override
public String map(Tuple1 value) throws Exception {
  if (cnt++ % 2 == 0) {
Thread.sleep(12);
  }
  return value.f0;
}

  }

  private static class Src extends AbstractTableInputFormat> {

@Override
protected Scan getScanner() {
  Scan scan = new Scan();
  scan.setStartRow(getStartRow());
  scan.setStopRow(getEndRow());
  scan.setCaching(1);
  scan.setCacheBlocks(false);
  return scan;
}

@Override
protected String getTableName() {
  return getTable();
}

@Override
protected Tuple1 mapResultToOutType(Result r) {
  return new Tuple1(Bytes.toString(r.getRow()));
}

@Override
public void configure(org.apache.flink.configuration.Configuration 
parameters) {
  scan = getScanner();
  try {
table = new HTable(getHadoopConf(), getTableName());
  } catch (IOException e) {
e.printStackTrace();
  }
}

  }

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

2019-11-25 Thread Mark Davis
Hi Flavio,

>> When the resultScanner dies because of a timeout (this happens a lot when 
>> you have backpressure and the time between 2 consecutive reads exceed the 
>> scanner timeout), the code creates a new scanner and restart from where it 
>> was (starRow = currentRow).
>> So there should not be any duplicates (in theory), but this could be the 
>> root of the problem..

Yes, you are right, the nextRecord() exception handling is responsible for the 
duplicate record processing:

org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed since 
the last invocation, timeout is currently set to 6
at 
org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
at 
org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.UnknownScannerException: 
org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already closed?
at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
at 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)

But I am not sure that the handling of the HBase exception thrown from 
ClientScanner.next() is correct.
If the call to mapResultToOutType(Result) finished without an error there is no 
need to restart from the same row.
The new scanner should start from the next row.
Is that so or am I missing something?

Best regards,
  Mark

GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-21 Thread Mark Harris
Hi,

We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop v 
"Amazon 2.8.5". We've recently noticed that some TaskManagers fail (causing all 
the jobs running on them to fail) with an "java.lang.OutOfMemoryError: GC 
overhead limit exceeded”. The taskmanager (and jobs that should be running on 
it) remain down until manually restarted.

I managed to take and analyze a memory dump from one of the afflicted 
taskmanagers.

It showed that 85% of the heap was made up of the 
java.io.DeleteOnExitHook.files hashset. The majority of the strings in that 
hashset (9041060 out of ~9041100) pointed to files that began 
/tmp/hadoop-yarn/s3a/s3ablock

The problem seems to affect jobs that make use of the StreamingFileSink - all 
of the taskmanager crashes have been on the taskmaster running at least one job 
using this sink, and a cluster running only a single taskmanager / job that 
uses the StreamingFileSink crashed with the GC overhead limit exceeded error.

I've had a look for advice on handling this error more broadly without luck.

Any suggestions or advice gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, copy, distribute, 
or retain this message or any part of it. It may contain information which is 
confidential and/or covered by legal professional or other privilege under 
applicable law.

The views expressed in this email are not necessarily the views of Centrica plc 
or its subsidiaries, and the company, its directors, officers or employees make 
no representation or accept any liability for its accuracy or completeness 
unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: 
https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas 
Social Housing Limited (company no: 01026007), British Gas Trading Limited 
(company no: 03078711), British Gas Services Limited (company no: 3141243), 
British Gas Insurance Limited (company no: 06608316), British Gas New Heating 
Limited (company no: 06723244), British Gas Services (Commercial) Limited 
(company no: 07385984) and Centrica Energy (Trading) Limited (company no: 
02877397) are all wholly owned subsidiaries of Centrica plc (company no: 
3033654). Each company is registered in England and Wales with a registered 
office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation 
Authority and regulated by the Financial Conduct Authority and the Prudential 
Regulation Authority. British Gas Services Limited and Centrica Energy 
(Trading) Limited are authorised and regulated by the Financial Conduct 
Authority. British Gas Trading Limited is an appointed representative of 
British Gas Services Limited which is authorised and regulated by the Financial 
Conduct Authority.


Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-27 Thread Mark Harris
Hi Piotr,

Thanks for the link to the issue.

Do you know if there's a workaround? I've tried setting the following in my 
core-site.xml:

​fs.s3a.fast.upload.buffer=true

To try and avoid writing the buffer files, but the taskmanager breaks with the 
same problem.

Best regards,

Mark

From: Piotr Nowojski  on behalf of Piotr Nowojski 

Sent: 22 January 2020 13:29
To: Till Rohrmann 
Cc: Mark Harris ; flink-u...@apache.org 
; kkloudas 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

This is probably a known issue of Hadoop [1]. Unfortunately it was only fixed 
in 3.3.0.

Piotrek

[1] https://issues.apache.org/jira/browse/HADOOP-15658

On 22 Jan 2020, at 13:56, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Thanks for reporting this issue Mark. I'm pulling Klou into this conversation 
who knows more about the StreamingFileSink. @Klou does the StreamingFileSink 
relies on DeleteOnExitHooks to clean up files?

Cheers,
Till

On Tue, Jan 21, 2020 at 3:38 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi,

We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop v 
"Amazon 2.8.5". We've recently noticed that some TaskManagers fail (causing all 
the jobs running on them to fail) with an "java.lang.OutOfMemoryError: GC 
overhead limit exceeded”. The taskmanager (and jobs that should be running on 
it) remain down until manually restarted.

I managed to take and analyze a memory dump from one of the afflicted 
taskmanagers.

It showed that 85% of the heap was made up of the 
java.io.DeleteOnExitHook.files hashset. The majority of the strings in that 
hashset (9041060 out of ~9041100) pointed to files that began 
/tmp/hadoop-yarn/s3a/s3ablock

The problem seems to affect jobs that make use of the StreamingFileSink - all 
of the taskmanager crashes have been on the taskmaster running at least one job 
using this sink, and a cluster running only a single taskmanager / job that 
uses the StreamingFileSink crashed with the GC overhead limit exceeded error.

I've had a look for advice on handling this error more broadly without luck.

Any suggestions or advice gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, copy, distribute, 
or retain this message or any part of it. It may contain information which is 
confidential and/or covered by legal professional or other privilege under 
applicable law.

The views expressed in this email are not necessarily the views of Centrica plc 
or its subsidiaries, and the company, its directors, officers or employees make 
no representation or accept any liability for its accuracy or completeness 
unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: 
https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas 
Social Housing Limited (company no: 01026007), British Gas Trading Limited 
(company no: 03078711), British Gas Services Limited (company no: 3141243), 
British Gas Insurance Limited (company no: 06608316), British Gas New Heating 
Limited (company no: 06723244), British Gas Services (Commercial) Limited 
(company no: 07385984) and Centrica Energy (Trading) Limited (company no: 
02877397) are all wholly owned subsidiaries of Centrica plc (company no: 
3033654). Each company is registered in England and Wales with a registered 
office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation 
Authority and regulated by the Financial Conduct Authority and the Prudential 
Regulation Authority. British Gas Services Limited and Centrica Energy 
(Trading) Limited are authorised and regulated by the Financial Conduct 
Authority. British Gas Trading Limited is an appointed representative of 
British Gas Services Limited which is authorised and regulated by the Financial 
Conduct Authority.



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, copy, distribute, 
or retain this message or any part of it. It may contain information which is 
confidential and/or covered by legal professional or other privilege under 
applicable law.

The views expressed in this email are not necessarily the views of Centrica plc 
or its subsidiaries, and the company, its directors, officers or employee

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-30 Thread Mark Harris
Trying a few different approaches to the fs.s3a.fast.upload settings has bought 
me no joy - the taskmanagers end up simply crashing or complaining of high GC 
load. Heap dumps suggest that this time they're clogged with buffers instead, 
which makes sense.

Our job has parallelism of 6 and checkpoints every 15 minutes - if anything, 
we'd like to increase the frequency of that checkpoint duration. I suspect this 
could be affected by the partition structure we were bucketing to as well, and 
at any given moment we could be receiving data for up to 280 buckets at once.
Could this be a factor?

Best regards,

Mark

From: Piotr Nowojski 
Sent: 27 January 2020 16:16
To: Cliff Resnick 
Cc: David Magalhães ; Mark Harris 
; Till Rohrmann ; 
flink-u...@apache.org ; kkloudas 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

I think reducing the frequency of the checkpoints and decreasing parallelism of 
the things using the S3AOutputStream class, would help to mitigate the issue.

I don’t know about other solutions. I would suggest to ask this question 
directly to Steve L. in the bug ticket [1], as he is the one that fixed the 
issue. If there is no workaround, maybe it would be possible to put a pressure 
on the Hadoop guys to back port the fix to older versions?

Piotrek

[1] https://issues.apache.org/jira/browse/HADOOP-15658

On 27 Jan 2020, at 15:41, Cliff Resnick 
mailto:cre...@gmail.com>> wrote:

I know from experience that Flink's shaded S3A FileSystem does not reference 
core-site.xml, though I don't remember offhand what file (s) it does reference. 
However since it's shaded, maybe this could be fixed by building a Flink FS 
referencing 3.3.0? Last I checked I think it referenced 3.1.0.

On Mon, Jan 27, 2020, 8:48 AM David Magalhães 
mailto:speeddra...@gmail.com>> wrote:
Does StreamingFileSink use core-site.xml ? When I was using it, it didn't load 
any configurations from core-site.xml.

On Mon, Jan 27, 2020 at 12:08 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi Piotr,

Thanks for the link to the issue.

Do you know if there's a workaround? I've tried setting the following in my 
core-site.xml:

​fs.s3a.fast.upload.buffer=true

To try and avoid writing the buffer files, but the taskmanager breaks with the 
same problem.

Best regards,

Mark

From: Piotr Nowojski mailto:pi...@data-artisans.com>> 
on behalf of Piotr Nowojski mailto:pi...@ververica.com>>
Sent: 22 January 2020 13:29
To: Till Rohrmann mailto:trohrm...@apache.org>>
Cc: Mark Harris mailto:mark.har...@hivehome.com>>; 
flink-u...@apache.org<mailto:flink-u...@apache.org> 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

This is probably a known issue of Hadoop [1]. Unfortunately it was only fixed 
in 3.3.0.

Piotrek

[1] https://issues.apache.org/jira/browse/HADOOP-15658

On 22 Jan 2020, at 13:56, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Thanks for reporting this issue Mark. I'm pulling Klou into this conversation 
who knows more about the StreamingFileSink. @Klou does the StreamingFileSink 
relies on DeleteOnExitHooks to clean up files?

Cheers,
Till

On Tue, Jan 21, 2020 at 3:38 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi,

We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop v 
"Amazon 2.8.5". We've recently noticed that some TaskManagers fail (causing all 
the jobs running on them to fail) with an "java.lang.OutOfMemoryError: GC 
overhead limit exceeded”. The taskmanager (and jobs that should be running on 
it) remain down until manually restarted.

I managed to take and analyze a memory dump from one of the afflicted 
taskmanagers.

It showed that 85% of the heap was made up of the 
java.io.DeleteOnExitHook.files hashset. The majority of the strings in that 
hashset (9041060 out of ~9041100) pointed to files that began 
/tmp/hadoop-yarn/s3a/s3ablock

The problem seems to affect jobs that make use of the StreamingFileSink - all 
of the taskmanager crashes have been on the taskmaster running at least one job 
using this sink, and a cluster running only a single taskmanager / job that 
uses the StreamingFileSink crashed with the GC overhead limit exceeded error.

I've had a look for advice on handling this error more broadly without luck.

Any suggestions or advice gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, cop

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-30 Thread Mark Harris
Hi,

Thanks for your help with this. 🙂

The EMR cluster has 3 15GB VMs, and the flink cluster is started with:

/usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3

Usually the task runs for about 15 minutes before it restarts, usually due to 
with an "java.lang.OutOfMemoryError: Java heap space" exception.

The figures came from a MemoryAnalyzer session on a manual memory dump from one 
of the taskmanagers. The total size of that heap was only 1.8gb.  In that heap, 
1.7gb is taken up by the static field "files" in DeleteOnExitHook, which is a 
linked hash set containing the 9 million strings.

A full example of one the path is 
/tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per 
char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and 
another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding on to 
an array of 16777216 HashMap$Node elements, the LinkedHashMap can only 
contribute another 20MB or so.
I goofed in not taking that 85% figure from MemoryAnalyzer - it tells me 
DeleteOnExitHook is responsible for 96.98% of the heap dump.

Looking at the files it managed to write before this started to happen 
regularly, it looks like they're being written approximately every 3 minutes. 
I'll triple check our config, but I'm reasonably sure the job is configured to 
checkpoint every 15 minutes - could something else be causing it to write?

This may all be a red herring - something else may be taking up the 
taskmanagers memory which didn't make it into that heap dump. I plan to repeat 
the analysis on a heapdump created by  -XX:+HeapDumpOnOutOfMemoryError shortly.

Best regards,

Mark


From: Piotr Nowojski 
Sent: 30 January 2020 13:44
To: Mark Harris 
Cc: Cliff Resnick ; David Magalhães ; 
Till Rohrmann ; flink-u...@apache.org 
; kkloudas 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

What is your job setup? Size of the nodes, memory settings of the Flink/JVM?

9 041 060 strings is awfully small number to bring down a whole cluster. With 
each tmp string having ~30 bytes, that’s only 271MB. Is this really 85% of the 
heap? And also, with parallelism of 6 and checkpoints every 15 minutes, 9 000 
000 of leaked strings should happen only after one month  assuming 500-600 
total number of buckets. (Also assuming that there is a separate file per each 
bucket).

Piotrek

On 30 Jan 2020, at 14:21, Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:

Trying a few different approaches to the fs.s3a.fast.upload settings has bought 
me no joy - the taskmanagers end up simply crashing or complaining of high GC 
load. Heap dumps suggest that this time they're clogged with buffers instead, 
which makes sense.

Our job has parallelism of 6 and checkpoints every 15 minutes - if anything, 
we'd like to increase the frequency of that checkpoint duration. I suspect this 
could be affected by the partition structure we were bucketing to as well, and 
at any given moment we could be receiving data for up to 280 buckets at once.
Could this be a factor?

Best regards,

Mark

From: Piotr Nowojski mailto:pi...@ververica.com>>
Sent: 27 January 2020 16:16
To: Cliff Resnick mailto:cre...@gmail.com>>
Cc: David Magalhães mailto:speeddra...@gmail.com>>; Mark 
Harris mailto:mark.har...@hivehome.com>>; Till 
Rohrmann mailto:trohrm...@apache.org>>; 
flink-u...@apache.org<mailto:flink-u...@apache.org> 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

I think reducing the frequency of the checkpoints and decreasing parallelism of 
the things using the S3AOutputStream class, would help to mitigate the issue.

I don’t know about other solutions. I would suggest to ask this question 
directly to Steve L. in the bug ticket [1], as he is the one that fixed the 
issue. If there is no workaround, maybe it would be possible to put a pressure 
on the Hadoop guys to back port the fix to older versions?

Piotrek

[1] https://issues.apache.org/jira/browse/HADOOP-15658

On 27 Jan 2020, at 15:41, Cliff Resnick 
mailto:cre...@gmail.com>> wrote:

I know from experience that Flink's shaded S3A FileSystem does not reference 
core-site.xml, though I don't remember offhand what file (s) it does reference. 
However since it's shaded, maybe this could be fixed by building a Flink FS 
referencing 3.3.0? Last I checked I think it referenced 3.1.0.

On Mon, Jan 27, 2020, 8:48 AM David Magalhães 
mailto:speeddra...@gmail.com>> wrote:
Does StreamingFileSink use core-site.xml ? When I was using it, it didn't load 
any configurations from core-site.xml.

On Mon, Jan 27, 2020 at 12:08 PM Mark Harris 
mailto:mark.har...@hivehome.com&

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Mark Harris
Hi all,

The out-of-memory heap dump had the answer - the job was failing with an 
OutOfMemoryError because the activeBuckets members of 3 instances of 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were filling a 
significant enough part of the memory of the taskmanager that no progress was 
being made. Increasing the memory available to the TM seems to have fixed the 
problem.

I think the DeleteOnExit problem will mean it needs to be restarted every few 
weeks, but that's acceptable for now.

Thanks again,

Mark

From: Mark Harris 
Sent: 30 January 2020 14:36
To: Piotr Nowojski 
Cc: Cliff Resnick ; David Magalhães ; 
Till Rohrmann ; flink-u...@apache.org 
; kkloudas 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

Thanks for your help with this. 🙂

The EMR cluster has 3 15GB VMs, and the flink cluster is started with:

/usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3

Usually the task runs for about 15 minutes before it restarts, usually due to 
with an "java.lang.OutOfMemoryError: Java heap space" exception.

The figures came from a MemoryAnalyzer session on a manual memory dump from one 
of the taskmanagers. The total size of that heap was only 1.8gb.  In that heap, 
1.7gb is taken up by the static field "files" in DeleteOnExitHook, which is a 
linked hash set containing the 9 million strings.

A full example of one the path is 
/tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per 
char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and 
another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding on to 
an array of 16777216 HashMap$Node elements, the LinkedHashMap can only 
contribute another 20MB or so.
I goofed in not taking that 85% figure from MemoryAnalyzer - it tells me 
DeleteOnExitHook is responsible for 96.98% of the heap dump.

Looking at the files it managed to write before this started to happen 
regularly, it looks like they're being written approximately every 3 minutes. 
I'll triple check our config, but I'm reasonably sure the job is configured to 
checkpoint every 15 minutes - could something else be causing it to write?

This may all be a red herring - something else may be taking up the 
taskmanagers memory which didn't make it into that heap dump. I plan to repeat 
the analysis on a heapdump created by  -XX:+HeapDumpOnOutOfMemoryError shortly.

Best regards,

Mark


From: Piotr Nowojski 
Sent: 30 January 2020 13:44
To: Mark Harris 
Cc: Cliff Resnick ; David Magalhães ; 
Till Rohrmann ; flink-u...@apache.org 
; kkloudas 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

What is your job setup? Size of the nodes, memory settings of the Flink/JVM?

9 041 060 strings is awfully small number to bring down a whole cluster. With 
each tmp string having ~30 bytes, that’s only 271MB. Is this really 85% of the 
heap? And also, with parallelism of 6 and checkpoints every 15 minutes, 9 000 
000 of leaked strings should happen only after one month  assuming 500-600 
total number of buckets. (Also assuming that there is a separate file per each 
bucket).

Piotrek

On 30 Jan 2020, at 14:21, Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:

Trying a few different approaches to the fs.s3a.fast.upload settings has bought 
me no joy - the taskmanagers end up simply crashing or complaining of high GC 
load. Heap dumps suggest that this time they're clogged with buffers instead, 
which makes sense.

Our job has parallelism of 6 and checkpoints every 15 minutes - if anything, 
we'd like to increase the frequency of that checkpoint duration. I suspect this 
could be affected by the partition structure we were bucketing to as well, and 
at any given moment we could be receiving data for up to 280 buckets at once.
Could this be a factor?

Best regards,

Mark

From: Piotr Nowojski mailto:pi...@ververica.com>>
Sent: 27 January 2020 16:16
To: Cliff Resnick mailto:cre...@gmail.com>>
Cc: David Magalhães mailto:speeddra...@gmail.com>>; Mark 
Harris mailto:mark.har...@hivehome.com>>; Till 
Rohrmann mailto:trohrm...@apache.org>>; 
flink-u...@apache.org<mailto:flink-u...@apache.org> 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

I think reducing the frequency of the checkpoints and decreasing parallelism of 
the things using the S3AOutputStream class, would help to mitigate the issue.

I don’t know about other solutions. I would suggest to ask this question 
directly to Steve L. in the bug ticket [1], as he is the one that fixed the 
issue. If there is no workaround, maybe it would be possible to put a pres

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Mark Harris
Hi Kostas,

Sorry, stupid question: How do I set that for a StreamingFileSink?

Best regards,

Mark

From: Kostas Kloudas 
Sent: 03 February 2020 14:58
To: Mark Harris 
Cc: Piotr Nowojski ; Cliff Resnick ; 
David Magalhães ; Till Rohrmann ; 
flink-u...@apache.org 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi Mark,

Have you tried to set your rolling policy to close inactive part files after 
some time [1]?
If the part files in the buckets are inactive and there are no new part files, 
then the state handle for those buckets will also be removed.

Cheers,
Kostas

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html



On Mon, Feb 3, 2020 at 3:54 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi all,

The out-of-memory heap dump had the answer - the job was failing with an 
OutOfMemoryError because the activeBuckets members of 3 instances of 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were filling a 
significant enough part of the memory of the taskmanager that no progress was 
being made. Increasing the memory available to the TM seems to have fixed the 
problem.

I think the DeleteOnExit problem will mean it needs to be restarted every few 
weeks, but that's acceptable for now.

Thanks again,

Mark

From: Mark Harris mailto:mark.har...@hivehome.com>>
Sent: 30 January 2020 14:36
To: Piotr Nowojski mailto:pi...@ververica.com>>
Cc: Cliff Resnick mailto:cre...@gmail.com>>; David Magalhães 
mailto:speeddra...@gmail.com>>; Till Rohrmann 
mailto:trohrm...@apache.org>>; 
flink-u...@apache.org<mailto:flink-u...@apache.org> 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

Thanks for your help with this. 🙂

The EMR cluster has 3 15GB VMs, and the flink cluster is started with:

/usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3

Usually the task runs for about 15 minutes before it restarts, usually due to 
with an "java.lang.OutOfMemoryError: Java heap space" exception.

The figures came from a MemoryAnalyzer session on a manual memory dump from one 
of the taskmanagers. The total size of that heap was only 1.8gb.  In that heap, 
1.7gb is taken up by the static field "files" in DeleteOnExitHook, which is a 
linked hash set containing the 9 million strings.

A full example of one the path is 
/tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per 
char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and 
another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding on to 
an array of 16777216 HashMap$Node elements, the LinkedHashMap can only 
contribute another 20MB or so.
I goofed in not taking that 85% figure from MemoryAnalyzer - it tells me 
DeleteOnExitHook is responsible for 96.98% of the heap dump.

Looking at the files it managed to write before this started to happen 
regularly, it looks like they're being written approximately every 3 minutes. 
I'll triple check our config, but I'm reasonably sure the job is configured to 
checkpoint every 15 minutes - could something else be causing it to write?

This may all be a red herring - something else may be taking up the 
taskmanagers memory which didn't make it into that heap dump. I plan to repeat 
the analysis on a heapdump created by  -XX:+HeapDumpOnOutOfMemoryError shortly.

Best regards,

Mark


From: Piotr Nowojski mailto:pi...@ververica.com>>
Sent: 30 January 2020 13:44
To: Mark Harris mailto:mark.har...@hivehome.com>>
Cc: Cliff Resnick mailto:cre...@gmail.com>>; David Magalhães 
mailto:speeddra...@gmail.com>>; Till Rohrmann 
mailto:trohrm...@apache.org>>; 
flink-u...@apache.org<mailto:flink-u...@apache.org> 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

What is your job setup? Size of the nodes, memory settings of the Flink/JVM?

9 041 060 strings is awfully small number to bring down a whole cluster. With 
each tmp string having ~30 bytes, that’s only 271MB. Is this really 85% of the 
heap? And also, with parallelism of 6 and checkpoints every 15 minutes, 9 000 
000 of leaked strings should happen only after one month  assuming 500-600 
total number of buckets. (Also assuming that there is a separate file per each 
bucket).

Piotrek

On 30 Jan 2020, at 14:21, Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:

Trying a few different approaches to the fs.s3a.fast.upload settings has bought 
me no joy - the taskmanagers

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Mark Harris
Hi Kostas,

Thanks for your help here - I think we're OK with the increased heap size, but 
happy to explore other alternatives.

I see the problem - we're currently using a BulkFormat, which doesn't seem to 
let us override the rolling policy. Is there an equivalent for the BulkFormat?

Best regards,

Mark

From: Kostas Kloudas 
Sent: 03 February 2020 15:39
To: Mark Harris 
Cc: Piotr Nowojski ; Cliff Resnick ; 
David Magalhães ; Till Rohrmann ; 
flink-u...@apache.org 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi Mark,

You can use something like the following and change the intervals accordingly:

final StreamingFileSink sink = StreamingFileSink
  .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder<>("UTF-8"))
   .withRollingPolicy(
   DefaultRollingPolicy.builder()
  
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
  
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
  
.withMaxPartSize(1024 * 1024 * 1024)
  .build()
  ) .build();

Let me know if this solves the problem.

Cheers,
Kostas

On Mon, Feb 3, 2020 at 4:11 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi Kostas,

Sorry, stupid question: How do I set that for a StreamingFileSink?

Best regards,

Mark

From: Kostas Kloudas mailto:kklou...@apache.org>>
Sent: 03 February 2020 14:58
To: Mark Harris mailto:mark.har...@hivehome.com>>
Cc: Piotr Nowojski mailto:pi...@ververica.com>>; Cliff 
Resnick mailto:cre...@gmail.com>>; David Magalhães 
mailto:speeddra...@gmail.com>>; Till Rohrmann 
mailto:trohrm...@apache.org>>; 
flink-u...@apache.org<mailto:flink-u...@apache.org> 
mailto:flink-u...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi Mark,

Have you tried to set your rolling policy to close inactive part files after 
some time [1]?
If the part files in the buckets are inactive and there are no new part files, 
then the state handle for those buckets will also be removed.

Cheers,
Kostas

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html



On Mon, Feb 3, 2020 at 3:54 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi all,

The out-of-memory heap dump had the answer - the job was failing with an 
OutOfMemoryError because the activeBuckets members of 3 instances of 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were filling a 
significant enough part of the memory of the taskmanager that no progress was 
being made. Increasing the memory available to the TM seems to have fixed the 
problem.

I think the DeleteOnExit problem will mean it needs to be restarted every few 
weeks, but that's acceptable for now.

Thanks again,

Mark

From: Mark Harris mailto:mark.har...@hivehome.com>>
Sent: 30 January 2020 14:36
To: Piotr Nowojski mailto:pi...@ververica.com>>
Cc: Cliff Resnick mailto:cre...@gmail.com>>; David Magalhães 
mailto:speeddra...@gmail.com>>; Till Rohrmann 
mailto:trohrm...@apache.org>>; 
flink-u...@apache.org<mailto:flink-u...@apache.org> 
mailto:flink-u...@apache.org>>; kkloudas 
mailto:kklou...@apache.org>>
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

Thanks for your help with this. 🙂

The EMR cluster has 3 15GB VMs, and the flink cluster is started with:

/usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3

Usually the task runs for about 15 minutes before it restarts, usually due to 
with an "java.lang.OutOfMemoryError: Java heap space" exception.

The figures came from a MemoryAnalyzer session on a manual memory dump from one 
of the taskmanagers. The total size of that heap was only 1.8gb.  In that heap, 
1.7gb is taken up by the static field "files" in DeleteOnExitHook, which is a 
linked hash set containing the 9 million strings.

A full example of one the path is 
/tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per 
char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and 
another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding on to 
an array of 16777216 HashMap$Node elements, the LinkedHashMap can only 
contribute another 20MB or so.
I goofed in not taking that 85% figure from MemoryAnalyzer - it tells me 
DeleteOn

State Processor API Keyed State

2020-02-18 Thread Mark Niehe
Hey all,

I've run into an issue with the State Processor API. To highlight the
issues I've been having, I've created a reference repository that will
demonstrate the issue (repository:
https://github.com/segmentio/flink-state-management).

The current implementation of the pipeline has left us with keyed state
that we no longer need, and we don't have references some of the old keys.
My plan was to:
1. create a savepoint
2. read the keys from each operator (using State Processor API)
3. filter out all the keys that are longer used
4. bootstrap a new savepoint that contains the filtered state

I managed to get this working using a sample pipeline and a very basic key
(a string), but when I switched the key to be something more complex (a
case class of two strings), I started seeing this exception:
Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.
at
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 13 more

Has anyone come across this before and figured out a fix? Any help you can
give would be greatly appreciated!

Thanks,
-- 
<http://segment.com/>
Mark Niehe ·  Software Engineer
Integrations
<https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·  Blog
<https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  We're
Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>


Flink Release Security Workflow

2020-03-18 Thread Mark Hapner
Are there any docs/links that describe the security workflow for a Flink 
release? For instance, the static code scan workflow; pen test workflow; 
security review of new features; etc.

The reason for the question is to better understand how to include Flink within 
the security workflow of a product that includes it as a component.



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Lack of KeyedBroadcastStateBootstrapFunction

2020-03-23 Thread Mark Niehe
Hey all,

I have another question about the State Processor API. I can't seem to find
a way to create a KeyedBroadcastStateBootstrapFunction operator. The two
options currently available to bootstrap a savepoint with state are
KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
these are the only two options, it's not possible to bootstrap both keyed
and broadcast state for the same operator. Are there any plans to add that
functionality or did I miss it entirely when going through the API docs?

Thanks,
-- 
<http://segment.com/>
Mark Niehe ·  Software Engineer
Integrations
<https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·  Blog
<https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  We're
Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>


Re: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Mark Niehe
Hi Gordan and Seth,

Thanks for explanation and opening up the ticket. I'll add some details in
the ticket to explain what we're trying to do which will hopefully add some
context.

-- 
<http://segment.com/>
Mark Niehe ·  Software Engineer
Integrations
<https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·  Blog
<https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  We're
Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>

On Mon, Mar 30, 2020 at 1:04 AM Tzu-Li (Gordon) Tai 
wrote:

> It seems like Seth's reply didn't make it to the mailing lists somehow.
> Forwarding his reply below:
>
> -- Forwarded message -
> From: Seth Wiesman 
> Date: Thu, Mar 26, 2020 at 5:16 AM
> Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction
> To: Dawid Wysakowicz 
> Cc: , Tzu-Li (Gordon) Tai 
>
>
> As Dawid mentioned, you can implement your own operator using the
> transform method to do this yourself. Unfortunately, that is fairly low
> level and would require you to understand some flink amount internals.
>
> The real problem is that the state processor api does not support two
> input operators. We originally skipped that because there were a number of
> open questions about how best to do it and it wasn't clear that it would be
> a necessary feature. Typically, flink users use two input operators to do
> some sort of join. And when bootstrapping state, you typically only want to
> pre-fill one side of that join. KeyedBroadcastState is clearly a good
> counter-argument to that.
>
> I've opened a ticket for the feature if you would like to comment there.
>
> https://issues.apache.org/jira/browse/FLINK-16784
>
> On Tue, Mar 24, 2020 at 9:17 AM Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> I am not very familiar with the State Processor API, but from a brief
>> look at it, I think you are right. I think the State Processor API does not
>> support mixing different kinds of states in a single operator for now. At
>> least not in a nice way. Probably you could implement the
>> KeyedBroadcastStateBootstrapFunction yourself and us it with
>> KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
>> I understand this is probably not the easiest task.
>>
>> I am not aware if there are plans to support that out of the box, but I
>> cc'ed Gordon and Seth who if I remember correctly worked on that API. I
>> hope they might give you some more insights.
>>
>> Best,
>>
>> Dawid
>>  On 23/03/2020 17:36, Mark Niehe wrote:
>>
>> Hey all,
>>
>> I have another question about the State Processor API. I can't seem to
>> find a way to create a KeyedBroadcastStateBootstrapFunction operator. The
>> two options currently available to bootstrap a savepoint with state are
>> KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
>> these are the only two options, it's not possible to bootstrap both keyed
>> and broadcast state for the same operator. Are there any plans to add that
>> functionality or did I miss it entirely when going through the API docs?
>>
>> Thanks,
>> --
>> <http://segment.com/>
>> Mark Niehe ·  Software Engineer
>> Integrations
>> <https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·
>> Blog <https://segment.com/blog?utm_source=signature&utm_medium=email>
>>   ·  We're Hiring!
>> <https://segment.com/jobs?utm_source=signature&utm_medium=email>
>>
>>


Avro serialization problem after updating to flink 1.6.0

2018-09-27 Thread Mark Harris
Hi,

I recently tried to update a flink job from 1.3.2 to 1.6.0. It deploys
successfully as usual, but logs the following exception shortly after
starting:

Caused by: org.apache.avro.AvroRuntimeException:
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.avro.AvroRuntimeException: Not a Specific class: class
uk.co.test.serde.AlertEvent
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.initializeAvro(AvroSerializer.java:367)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.checkAvroInitialized(AvroSerializer.java:357)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.snapshotConfiguration(AvroSerializer.java:269)
at
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.computeSnapshot(RegisteredKeyValueStateBackendMetaInfo.java:241)
at
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.snapshot(RegisteredKeyValueStateBackendMetaInfo.java:226)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.getMetaInfoSnapshot(CopyOnWriteStateTableSnapshot.java:173)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates(HeapKeyedStateBackend.java:880)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.performSnapshot(HeapKeyedStateBackend.java:719)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:355)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383)
... 13 more
Caused by:
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.avro.AvroRuntimeException: Not a Specific class: class
uk.co.test.serde.AlertEvent
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
at
avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
at
avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
at
avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
... 23 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
class uk.co.AlertEvent
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 27 more


AlertEvent is a scala case class generated using sbt avrohugger (
https://github.com/julianpeeters/sbt-avrohugger) that definitely implements
SpecificRecordBase.

There has been an Avro verion jump betwen 1.3.2 and 1.6.0, from 1.7.7 to
1.8.2 but I've rebuilt the avro model against Avro 1.8.2 and had a brief
look at the code in SpecificData.create - it seems like it would still have
tried the getDeclaredField("$SCHEMA") check that's throwing.

Any advice on how to figure out what's causing the problem, or work around
it would be gratefully received.

Best regards,

Mark Harris

-- 
hivehome.com <http://www.hivehome.com>






Hive | London | Cambridge | 
Houston | Toronto
The information contained in or attached to this 
email is confidential and intended only for the use of the individual(s) to 
which it is addressed. It may contain information which is confidential 
and/or covered by legal professional or other privilege. The views 
expressed in this email are not necessarily the views of Centrica plc, and 
the company, its directors, officers or employees make no representation or 
accept any liability for their accuracy or completeness unless expressly 
stated to the contrary. 
Centrica Hive Limited (company no: 5782908), 
registered in England and Wales with its registered office at Millstream, 
Maidenhead Road, Windsor, Berkshire SL4 5GD.




Re: Avro serialization problem after updating to flink 1.6.0

2018-10-23 Thread Mark Harris
Hi,

It does not. Looking at the generated code, that SCHEMA$ value gets created
in the companion object for the case class (which behaves equivalently to a
static field in java).

This gets compiled down to a classfile with a $ suffix- in this case,
"AlertEvent.SCHEMA$" doesn't exist, and to get the schema,
AlertEvent$.MODULE$.SCHEMA$() would have to be called. Looking at the
output for javap, there is a SCHEMA$() method on AlertEvent too.

I wonder if the field itself is a bit of a red herring though? The code
we're using for that event hasn't changed between flink 1.3.2 and 1.6.1?

Best regards,

Mark


On Thu, 4 Oct 2018 at 14:03, Aljoscha Krettek  wrote:

> Hi,
>
> can you check whether AlertEvent actually has a field called "SCHEMA$"?
> You can do that via
> javap path/to/AlertEvent.class
>
> Best,
> Aljoscha
>
> On 27. Sep 2018, at 10:03, Mark Harris  wrote:
>
> Hi,
>
> I recently tried to update a flink job from 1.3.2 to 1.6.0. It deploys
> successfully as usual, but logs the following exception shortly after
> starting:
>
> Caused by: org.apache.avro.AvroRuntimeException:
> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.avro.AvroRuntimeException: Not a Specific class: class
> uk.co.test.serde.AlertEvent
> at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.initializeAvro(AvroSerializer.java:367)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.checkAvroInitialized(AvroSerializer.java:357)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.snapshotConfiguration(AvroSerializer.java:269)
> at
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.computeSnapshot(RegisteredKeyValueStateBackendMetaInfo.java:241)
> at
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.snapshot(RegisteredKeyValueStateBackendMetaInfo.java:226)
> at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.getMetaInfoSnapshot(CopyOnWriteStateTableSnapshot.java:173)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllStates(HeapKeyedStateBackend.java:880)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy.performSnapshot(HeapKeyedStateBackend.java:719)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:355)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:383)
> ... 13 more
> Caused by:
> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.avro.AvroRuntimeException: Not a Specific class: class
> uk.co.test.serde.AlertEvent
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
> at
> avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
> at
> avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
> at
> avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
> at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
> ... 23 more
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> class uk.co.AlertEvent
> at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> ... 27 more
>
>
> AlertEvent is a scala case class generated using sbt avrohugger (
> https://github.com/julianpeeters/sbt-avrohugger) that definitely
> implements SpecificRecordBase.
>
> There has been an Avro verion jump betwen 1.3.2 and 1.6.0, from 1.7.7 to
> 1.8.2 but I've rebuilt the avro model against Avro 1.8.2 and had a brief
> look at the code in SpecificData.create - it seems like it would still have
> tried the getDeclaredField("$SCHEMA") check that's throwing.
>
> Any advice on how to figure out what's causing the problem, or work around
> it would be gratefully received.
>
> Best regards,
>

KafkaException or ExecutionStateChange failure on job startup

2018-10-23 Thread Mark Harris
 message of type
"org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)


Neither seem related to the job code at all, but seem to be problems with
the metrics on the flink connector and something internal to flink. They
seem to happen once at startup, and don't repeat once the cluster reaches a
steady state.

The jobs also appear to be running correctly in spite of these Exceptions
appearing in their "Exception" tab in the jobmanager.

Is there something that we need to fix in our setup? Are there any
implications around missing metrics etc?

Best regards,

Mark Harris

-- 
hivehome.com <http://www.hivehome.com>






Hive | London | Cambridge | 
Houston | Toronto
The information contained in or attached to this 
email is confidential and intended only for the use of the individual(s) to 
which it is addressed. It may contain information which is confidential 
and/or covered by legal professional or other privilege. The views 
expressed in this email are not necessarily the views of Centrica plc, and 
the company, its directors, officers or employees make no representation or 
accept any liability for their accuracy or completeness unless expressly 
stated to the contrary. 
Centrica Hive Limited (company no: 5782908), 
registered in England and Wales with its registered office at Millstream, 
Maidenhead Road, Windsor, Berkshire SL4 5GD.




Re: KafkaException or ExecutionStateChange failure on job startup

2018-10-26 Thread Mark Harris
Hi Dominik

Setting that bit of configuration seems to have done the trick for the
MXBean exception.

Many thanks for your help.

Best regards,

Mark

On Tue, 23 Oct 2018 at 14:41, Dominik Wosiński  wrote:

> Hey Mark,
>
> Do You use more than 1 Kafka consumer for Your jobs? I think this relates
> to the known issue in Kafka:
> https://issues.apache.org/jira/browse/KAFKA-3992.
> The problem is that if You don't provide client ID for your
> *KafkaConsumer* Kafka assigns one, but this is done in an unsynchronized
> way, so finally, it ends up in assigning the same id for multiple
> different Consumer instances. Probably this is what happens when multiple
> jobs are resumed at the same time.
>
> What You could try to do is to assign the *consumer.id
> <http://consumer.id>* using properties passed to each consumer. This
> should help in solving this issue.
>
> Best Regards,
> Dom.
>
>
>
>
> wt., 23 paź 2018 o 13:21 Mark Harris 
> napisał(a):
>
>> Hi,
>> We regularly see the following two exceptions in a number of jobs shortly
>> after they have been resumed during our flink cluster startup:
>>
>> org.apache.kafka.common.KafkaException: Error registering mbean
>> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>> at
>> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
>> at
>> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
>> at
>> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:436)
>> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
>> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
>> at
>> org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:749)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:327)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188)
>> at
>> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:283)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1344)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:473)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: javax.management.InstanceAlreadyExistsException:
>> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>> at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>> at
>> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
>> ... 21 more
>> java.lang.Exception: Failed to send ExecutionStateChange notification to
>> JobManager
>> at
>> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$org$apache$fli

Flink zookeeper HA problem

2019-03-07 Thread Harris, Mark
it$.result(package.scala:142)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:132)
... 8 more

Further attempts to start the cluster fail in the same way, and the only 
solution seems to be to clear out the HA information in zookeeper (and the 
filesystem). After doing this, the cluster starts successfully without running 
any jobs.
Cluster 2 usually starts up first and succeeds, but we've seen it both ways 
around. We've also tried swapping the values of cluster-id and path.root for 
each cluster, which has the same problem.


The problem appears to be that it can't find the leader/dispatcher. Turning the 
logging up to DEBUG, I can see some suggestion in the jobmanager log that this 
election has completed successfully:
2019/03/07 10:59:02,781 DEBUG 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Grant 
leadership to contender 
akka.tcp://fl...@ip-10-100-30-114.eu-west-1.compute.internal:44315/user/dispatcher
 with session ID fd044f52-b05c-4feb-9ccf-4c8f18ddf18c.
2019/03/07 10:59:08,035 DEBUG 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher 
akka.tcp://fl...@ip-10-100-30-114.eu-west-1.compute.internal:44315/user/dispatcher
 accepted leadership with fencing token 9ccf4c8f18ddf18cfd044f52b05c4feb. Start 
recovered jobs.

There don't seem to be any errors in the jobmanager log.

Any advice on how to start these two clusters or suggestions for other avenues 
to solve or debug would be really gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, copy, distribute, 
or retain this message or any part of it. It may contain information which is 
confidential and/or covered by legal professional or other privilege under 
applicable law.

The views expressed in this email are not necessarily the views of Centrica plc 
or its subsidiaries, and the company, its directors, officers or employees make 
no representation or accept any liability for its accuracy or completeness 
unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: 
https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas 
Social Housing Limited (company no: 01026007), British Gas Trading Limited 
(company no: 03078711), British Gas Services Limited (company no: 3141243), 
British Gas Insurance Limited (company no: 06608316), British Gas New Heating 
Limited (company no: 06723244), British Gas Services (Commercial) Limited 
(company no: 07385984) and Centrica Energy (Trading) Limited (company no: 
02877397) are all wholly owned subsidiaries of Centrica plc (company no: 
3033654). Each company is registered in England and Wales with a registered 
office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation 
Authority and regulated by the Financial Conduct Authority and the Prudential 
Regulation Authority. British Gas Services Limited and Centrica Energy 
(Trading) Limited are authorised and regulated by the Financial Conduct 
Authority. British Gas Trading Limited is an appointed representative of 
British Gas Services Limited which is authorised and regulated by the Financial 
Conduct Authority.


Re: Flink zookeeper HA problem

2019-03-11 Thread Harris, Mark
Sometimes it's the simplest things - the 40 or so jobs we have seem to take 
longer to reload on cluster start up than in flink 1.6, and it was timing out. 
Increasing the value for the timeout over 5 minutes and everything works again.

From: Harris, Mark 
Sent: 07 March 2019 12:33
To: user
Subject: Flink zookeeper HA problem

Hi,

We've got a problem trying to set up two flink clusters using the same 
zookeeper instance that we wonder if anyone has seen before or has any advice 
on.

Our setup is two AWS EMR clusters running flink (v1.7.2) that are both trying 
to use a single zookeeper cluster (v3.4.6-1569965) for their HA configuration. 
As a cost saving measure, we have the clusters configured to terminate at 19:00 
and restart at 08:00 each day.

Cluster 1 is configured / started like this:

/usr/lib/flink/bin/yarn-session.sh -d -n 6 -tm 9472 -jm 2048 -s 8

high-availability: zookeeper
high-availability.cluster-id: /cluster1
high-availability.storageDir: s3:///flink/cluster1/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: zk0:2181,zk1:2181,zk2:2181


Cluster 2 is configured / started like this:

/usr/lib/flink/bin/yarn-session.sh -d -n 2 -tm 6144 -jm 6144 -s 1

high-availability: zookeeper
high-availability.cluster-id: /cluster2
high-availability.storageDir: s3:///flink/cluster2/recovery
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: zk0:2181,zk1:2181,zk2:2181


Since upgrading to flink 1.7.2 from 1.6.1, we've found that whichever cluster 
happens to start second fails to start with the following error:


2019-03-07 10:59:05,142 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-03-07 10:59:05,166 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 10:59:05,171 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,184 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,187 INFO  org.apache.flink.runtime.rest.RestClient  
- Shutting down rest endpoint.
2019-03-07 11:00:05,194 INFO  org.apache.flink.runtime.rest.RestClient  
- Rest endpoint shutdown complete.
2019-03-07 11:00:05,194 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-03-07 11:00:05,195 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-07 11:00:05,195 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
  - backgroundOperationsLoop exiting
2019-03-07 11:00:05,200 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
0x6b691520c6774910 closed
2019-03-07 11:00:05,200 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
EventThread shut down for session: 0x6b691520c6774910
2019-03-07 11:00:05,408 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killed 
application application_1551946032263_0005
2019-03-07 11:00:05,488 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Error while running the Flink Yarn session.
org.apache.flink.util.FlinkException: Could not write the Yarn connection 
information.
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:636)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:810)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:810)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: 
Could not retrieve the leader address and leader session ID.
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderConnectionInfo(LeaderRetrievalUtils.java:134)
at 
org.apache.flink.client.program.rest.RestClusterClient.getClusterConnectionInfo(RestClusterClient.java:513)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:613)
... 6 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[6 milliseco

Flink Operator in Golang?

2022-11-17 Thread Mark Lee
Hi,

  I found we already have Flink operator implemented by java. But I have two
questions:

1. If we can implement Flink operator using golang? Is there some hidden
difficult traps?

2. We can submit Java jar jobs or sql jobs, can we submit golang jobs?

 

Thank you.

 



答复: Flink Operator in Golang?

2022-11-17 Thread Mark Lee
I got it, Thanks Zhanghao!

 

发件人: user-return-51640-lifuqiong00=126@flink.apache.org
 代表
zhanghao.c...@outlook.com
发送时间: 2022年11月17日 23:36
收件人: Mark Lee ; user@flink.apache.org
主题: Re: Flink Operator in Golang?

 

Hi Mark,

 

1.  Directly quoting from
https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flin
k+Kubernetes+Operator:

Main reasons for choosing Java over Go

・   Direct access to Flink Client libraries for submitting, managing
jobs and handling errors

・   Most Flink developers have strong Java experience while there are
only few Go experts

・   Easier to integrate with existing build system and tooling

・   Required k8s clients and tools for building an operator are also
available in Java

2.  unfortunately, Golang API is not supported yet

 

 

Best,

Zhanghao Chen

  _  

From: Mark Lee mailto:lifuqion...@126.com> >
Sent: Thursday, November 17, 2022 16:16
To: user@flink.apache.org <mailto:user@flink.apache.org>
mailto:user@flink.apache.org> >
Subject: Flink Operator in Golang? 

 

Hi,

  I found we already have Flink operator implemented by java. But I have two
questions:

1. If we can implement Flink operator using golang? Is there some hidden
difficult traps?

2. We can submit Java jar jobs or sql jobs, can we submit golang jobs?

 

Thank you.

 



Deploy Flink Operator in an k8s enviroment without helm?

2022-11-17 Thread Mark Lee
Hi all,

   I am trying to deploy flink operator followed Quick Start
 . But it seems need helm client
installed in Kubernetes environment. Could we deploy flink operator without
helm client installed?

Now you can deploy the selected stable Flink Kubernetes Operator version
using the included Helm chart:

helm repo add flink-operator-repo
https://downloads.apache.org/flink/flink-kubernetes-operator-/

helm install flink-kubernetes-operator
flink-operator-repo/flink-kubernetes-operator

 

Thank you.



答复: Deploy Flink Operator in an k8s enviroment without helm?

2022-11-17 Thread Mark Lee
Thanks Biao,

Sorry I’m a rookie in helm or operator. I know that I can prepare RBAC,
ServiceAccount or Deployment creation by CRs or yaml files. 

But the basic OS(Kubernetes) environment I can’t install any other software
the basic like helm 

But I can’t install any software(like helm) the basic OS does not supply in
my scenario.

 

发件人: Biao Geng  
发送时间: 2022年11月18日 13:12
收件人: Mark Lee ; user@flink.apache.org
主题: Re: Deploy Flink Operator in an k8s enviroment without helm?

 

Hi Mark,

I believe you can do that without helm. Just like that you can install some
software in CentOS without yum. 

But you may have to handle some basic setup by yourself. For the operator,
you at least have to prepare RBAC creation, serviceAccount creation,
Deployment creation or Webhook creation which if you want to use. Also, if
you want to uninstall the operator, you should clear those resources by
hand. It is not very hard but does require some hand work. 

 

Best,

Biao Geng

 

获取 Outlook for iOS <https://aka.ms/o0ukef> 

  _  

发件人: Mark Lee mailto:lifuqion...@126.com> >
发送时间: Friday, November 18, 2022 12:57:26 PM
收件人: user@flink.apache.org <mailto:user@flink.apache.org>
mailto:user@flink.apache.org> >
主题: Deploy Flink Operator in an k8s enviroment without helm? 

 

Hi all,

   I am trying to deploy flink operator followed Quick Start
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs
/try-flink-kubernetes-operator/quick-start/> . But it seems need helm client
installed in Kubernetes environment. Could we deploy flink operator without
helm client installed?

Now you can deploy the selected stable Flink Kubernetes Operator version
using the included Helm chart:

helm repo add flink-operator-repo
https://downloads.apache.org/flink/flink-kubernetes-operator-
<https://downloads.apache.org/flink/flink-kubernetes-operator-%3cOPERATOR-VE
RSION%3e/> /

helm install flink-kubernetes-operator
flink-operator-repo/flink-kubernetes-operator

 

Thank you.



答复: Deploy Flink Operator in an k8s enviroment without helm?

2022-11-17 Thread Mark Lee
Thanks Gyula,

 My basic OS does not have a helm client and don’t allow install it. Could 
I deploy flink operator in such situation?

 

发件人: user-return-51648-lifuqiong00=126@flink.apache.org 
 代表 Gyula Fóra
发送时间: 2022年11月18日 13:26
收件人: Biao Geng 
抄送: Mark Lee ; user@flink.apache.org
主题: Re: Deploy Flink Operator in an k8s enviroment without helm?

 

Adding to what Biao Geng said, yes it is completely possible and other 
installation methods are used by many users already.

You can check the Helm templates in the repo to get an idea what resources you 
need to create.
Actually if you run `helm template flink-kubernetes-operator 
helm/flink-kubernetes-operator` from the git repo your local helm client will 
render the templates and print the resources that it would generate.
That is a great way to get started with manual installation.

There are also OLM bundles available for the current operator releases, these 
are not yet officially supported by the Flink community but we are working 
towards that: https://operatorhub.io/operator/flink-kubernetes-operator
maybe this is more relevant in your environment.

To summarize, there are many ways to install the operator, Helm is just one of 
the more convenient ones, that's why we use it as the example in the repo.
Production setups usually need to customize at least parts of the deployment 
logic in any case.

Gyula

 

 

On Fri, Nov 18, 2022 at 6:12 AM Biao Geng mailto:biaoge...@gmail.com> > wrote:

Hi Mark,

I believe you can do that without helm. Just like that you can install some 
software in CentOS without yum. 

But you may have to handle some basic setup by yourself. For the operator, you 
at least have to prepare RBAC creation, serviceAccount creation, Deployment 
creation or Webhook creation which if you want to use. Also, if you want to 
uninstall the operator, you should clear those resources by hand. It is not 
very hard but does require some hand work. 

 

Best,

Biao Geng

 

获取 Outlook for iOS <https://aka.ms/o0ukef> 


  _____  


发件人: Mark Lee mailto:lifuqion...@126.com> >
发送时间: Friday, November 18, 2022 12:57:26 PM
收件人: user@flink.apache.org <mailto:user@flink.apache.org>  
mailto:user@flink.apache.org> >
主题: Deploy Flink Operator in an k8s enviroment without helm? 

 

Hi all,

   I am trying to deploy flink operator followed Quick Start 
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/>
 . But it seems need helm client installed in Kubernetes environment. Could we 
deploy flink operator without helm client installed?

Now you can deploy the selected stable Flink Kubernetes Operator version using 
the included Helm chart:

helm repo add flink-operator-repo 
https://downloads.apache.org/flink/flink-kubernetes-operator-/

helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator

 

Thank you.



Flink Operator in an off-line k8s enviroment

2022-11-22 Thread Mark Lee
Hi all,

I installed flink operator following
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/
try-flink-kubernetes-operator/quick-start/.

helm repo add flink-operator-repo
https://downloads.apache.org/flink/flink-kubernetes-operator-1.2.0/

helm install flink-kubernetes-operator
flink-operator-repo/flink-kubernetes-operator

 

I execute above commands in a helm client(can touch internet) collect a k8s
environment which cann't connect internet.

 

  The flink operator is installed correctly but I got such errors for my k8s
cluster can't connect internet.

What steps can I do to run flink operator correctly in my off-line k8s
cluster?

Should I run a local helm repo to replace the image
"ghcr.io/apache/flink-kubernetes-operator:95128bf" to a local image?

  

   Thank you.

 

[root@localhost ~]# kubectl  get pods

NAMEREADY   STATUS
RESTARTS   AGE

flink-kubernetes-operator-7797c7bd7-tpbqf   0/1 ImagePullBackOff   0
124m

 

[root@localhost ~]# kubectl  describe pod
flink-kubernetes-operator-7797c7bd7-tpbqf | grep Image -C 5

Normal   AddedInterface  124mmultus Add eth0
[10.128.6.212/14] from kube-ovn

  Warning  Failed  119m (x4 over 123m) kubeletError:
ErrImagePull

  Warning  Failed  118m (x7 over 123m) kubeletError:
ImagePullBackOff

  Normal   Pulling 34m (x19 over 124m) kubelet
Pulling image "ghcr.io/apache/flink-kubernetes-operator:95128bf"

  Warning  Failed  8m53s (x23 over 123m)   kubeletFailed
to pull image "ghcr.io/apache/flink-kubernetes-operator:95128bf": rpc error:
code = Unknown desc = pinging container registry ghcr.io: Get
"https://ghcr.io/v2/": dial tcp 20.205.243.164:443: i/o timeout

  Normal   BackOff 4m20s (x424 over 123m)  kubelet
Back-off pulling image "ghcr.io/apache/flink-kubernetes-operator:95128bf"

 



How can I deploy a flink cluster with 4 TaskManagers?

2022-11-25 Thread Mark Lee
Hi all,

How can I deploy a flink cluster with 1 Job Manager and 4 Task Managers
using FlinkDeployment CR?

 

Such sample in Flink Operator can only create 1 Task Manager.

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-session-deployment-only-example
spec:
  image: flink:1.15
  flinkVersion: v1_15
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1

 

Thank you.



How to make flink operator as a cluster operator?

2022-11-28 Thread Mark Lee
Hi all,  

   How to make flink operator as a cluster operator? And How to register it
to CVO(Cluster Version Operator)?

   

   I didn't find any code or configure file in flink-operator's code.

   

   Thank you.

 

Mark



Flink Operator create Session Job unsuccessfully

2022-11-30 Thread Mark Lee
Hi, 

   I'm creating a session job using FlinkOperator. The CRD as follows:

 




#  Licensed to the Apache Software Foundation (ASF) under one

#  or more contributor license agreements.  See the NOTICE file

#  distributed with this work for additional information

#  regarding copyright ownership.  The ASF licenses this file

#  to you under the Apache License, Version 2.0 (the

#  "License"); you may not use this file except in compliance

#  with the License.  You may obtain a copy of the License at

#

#  http://www.apache.org/licenses/LICENSE-2.0

#

#  Unless required by applicable law or agreed to in writing, software

#  distributed under the License is distributed on an "AS IS" BASIS,

#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

#  See the License for the specific language governing permissions and

# limitations under the License.




 

apiVersion: flink.apache.org/v1beta1

kind: FlinkDeployment

metadata:

  namespace: default

  name: flink-example-statemachine

spec:

  image: flink:1.15.2

  flinkVersion: v1_15

  ingress:

template: "/{{namespace}}/{{name}}(/|$)(.*)"

className: "nginx"

annotations:

  nginx.ingress.kubernetes.io/rewrite-target: "/$2"

  flinkConfiguration:

taskmanager.numberOfTaskSlots: "2"

high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.storageDir: file:///opt/flink/volume/flink-ha

state.checkpoints.dir: file:///opt/flink/volume/flink-cp

state.savepoints.dir: file:///opt/flink/volume/flink-sp

  serviceAccount: flink

  podTemplate:

apiVersion: v1

kind: Pod

metadata:

  name: pod-template

spec:

  initContainers:

- name: artifacts-fetcher

  image: flinktest:1.0

  imagePullPolicy: Always

  command: [ 'cp','/flink-artifact/myjob.jar', '/tmp/myjob.jar' ]

  # Use wget or other tools to get user jars from remote storage

  volumeMounts:

- mountPath: /tmp

  name: flink-artifact

  containers:

# Do not change the main container name

- name: flink-main-container

  resources:

requests:

  ephemeral-storage: 2048Mi

limits:

  ephemeral-storage: 2048Mi

  volumeMounts:

- mountPath: /opt/flink/usrlib

  name: flink-artifact

- mountPath: /opt/flink/volume

  name: flink-volume

  volumes:

- name: flink-artifact

  emptyDir: { }

- name: flink-volume

  persistentVolumeClaim:

claimName: flink-example-statemachine

  jobManager:

resource:

  memory: "1024m"

  cpu: 0.5

  taskManager:

resource:

  memory: "1024m"

  cpu: 0.5

  mode: native

 

---

apiVersion: flink.apache.org/v1beta1

kind: FlinkSessionJob

metadata:

  namespace: default

  name: newjob

spec:

  deploymentName: flink-example-statemachine

  job:

jarURI: local:///opt/flink/usrlib/myjob.jar

parallelism: 2

upgradeMode: last-state

entryClass:
org.apache.flink.streaming.examples.statemachine.StateMachineExample

 

---

apiVersion: v1

kind: PersistentVolumeClaim

metadata:

  name: flink-example-statemachine

spec:

  storageClassName: ccos-hostpath-data-meta

  accessModes:

- ReadWriteOnce

  volumeMode: Filesystem

  resources:

requests:

  storage: 1Gi

 

---

apiVersion: networking.k8s.io/v1

kind: IngressClass

metadata:

  annotations:

ingressclass.kubernetes.io/is-default-class: "true"

  labels:

app.kubernetes.io/component: controller

  name: nginx

spec:

  controller: k8s.io/ingress-nginx



 

and the job manager is created successfully but the task manager pod is not
created, there's no error log in flink operator and job manager.

Follow picture is pods list in my k8s environment.



 

And the same config using application mode, the job can create successfully.

 

It troubles me serveral days. Can someone help me ? Thank you.

 



Web UI not working with createLocalEnvironmentWithWebUI()

2023-04-16 Thread Mark Petronic
I am learning Flink for a new project. I am trying to understand the
development/debug environment to help me step through my code to better
learn Flink. I am using the Intellij community edition for my IDE and Flink
1.17.0.

I am using this simple Flink app to demonstrate my issue.

//===
package streamwindowprocessor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SimpleFlink {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream values = env.socketTextStream("localhost", );
values.print();

env.execute("Alarm Stream Processor");
}
}
//===

Before I run this from the IDE, I start up a socket listener on a terminal:

nc -lkp 

Then I open a web browser to localhost:8081 and get this output

{

   - errors:
   [
  - "Not found: /"
  ]

}

If instead, I use ./start-cluster.sh to start a standalone cluster, rebuild
my jar using getExecutionEnvironment() and submit that same simple jar
using ./flink.sh run , then I can open the browser to
localhost:8081 and I do see my app running, as expected, and it processes
strings I send via the running netcat.

Someone in SO noted that you should add this dependency, which I did and it
made no difference.


  org.apache.flink
  flink-runtime-web
  ${flink.version}


*Can anyone help me understand why the web UI does not work
in createLocalEnvironmentWithWebUI()?*

Thanks,
Mark


Unsubscribe

2023-07-04 Thread Mark Petronic



Checkpoint/Savepoint restore of S3 file reads using continuous read mode

2023-10-06 Thread Mark Petronic
I am trying to understand the Flink design pattern for consuming files from
S3 continuously as they appear. I have written the below minimal program to
do that and it works as expected wrt detecting newly-uploaded S3 files
within the configured 5 second monitoring poll period. Then it just prints
the file contents to stdout as a simple task.

Where I am having difficulty is understanding the design pattern to
maintain state so that upon restart, the Flink app will NOT reprocess files
that it already processed.

I can see that Flink is retaining state in my configured state location at
file:///var/tmp/flink/checkpoints/ and when inspecting state files at paths
like
/var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata,
I see that the S3 path to the files already processed by Flink show up in
the _metadata file. So, I know the state of each file is being captured.

Now I am trying to understand a few concepts;

   1. How much state will Flink retain? If the files in the bucket are
   retained for a long time then there could be a lot of files piling up in
   the bucket with say, a life cycle delete policy of 30 days. It seems that
   Flink would have to retain the complete list to be able to avoid
   reprocessing existing files and that would be quite a lot of state.
   2. I understand from the docs that you can restart Flink using state
   from either a savepoint or a checkpoint. I was trying to restart my test
   application standalone using the following command from my dev environment
   but, upon startup, it still reprocesses the files that are in the _metadata
   state captured from the previous run. Is the "--fromSavepoint" option
   the correct way to specify the savepoint file to be read at startup?

/usr/bin/env \
ASP_USE_LOCAL_ENV=1 \
ASP_VERBOSE_LOGGING=true \
ASP_CHECKPOINT_INTERVAL_MILLISECONDS=5000 \
/usr/lib/jvm/java-11-openjdk-amd64/bin/java \
@./startup.argfile \
com.myapp.weatherstreamprocessor.WeatherStreamProcessor \
--fromSavepoint
/var/tmp/flink/checkpoints/6d3c9a96c13be31760416212bd3fd33d/chk-5/_metadata

I am using the Flink operator to deploy my Flink application to EKS and
already have one production Flink application that is consuming from and
writing to Kinesis, so I have some initial Flink experience doing that. So,
I realize that, when deployed in my EKS cluster, checkpointing is meant for
recovery of the task managers by the job manager should the task managers
need to be restarted. And, for managed restarts (like code updates), I
should be using an explicitly created savepoint. But I am just trying to
prove the behavior in my test environment.

Could someone kindly direct me to the right approach to be able to restart
in my test environment, read the checkpoint, and NOT have Flink reprocesses
the files already seen in the previous running instance?

Thanks!




#
==
# My Test Application
#
==

package com.myapp.weatherstreamprocessor;

import java.time.Duration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WeatherStreamProcessor {
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

conf.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true);
conf.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024
^ 3));
conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, new MemorySize(1024
^ 3));
conf.set(TaskManagerOptions.CPU_CORES, 4.0);

final StreamExecutionEnvironment env;
Config appCfg = new Config();

if (appCfg.getUseLocalEnv()) {
env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
}

env.setParallelism(appCfg.getParallelism());
if (appCfg.getCheckpointInterval() > 0) {
env.enableCheckpointing(appCfg.getCheckpointInterval());
}
CheckpointConfig config = env.getCheckpointConfig();

config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config

Best practice way to conditionally discard a window and not serialize the results

2023-10-30 Thread Mark Petronic
I am reading stats from Kinesis, deserializing them into a stat POJO and
then doing something like this using an aggregated window with no defined
processWindow function:

timestampedStats
.keyBy(v -> v.groupKey())

.window(TumblingEventTimeWindows.of(Time.seconds(appCfg.getWindowSize(
.aggregate(new ImpactAggregator(appCfg.getSmoothingRange(),
appCfg.getThreshold()))

.sinkTo(getKinesisProducer(appCfg.getAcuWindowsStreamName(),
appCfg.getAwsRegion()))
.name("Kinesis Producer");

As part of the aggregation logic, I am looking for certain threshold
violations where some field in each metric is tested against some fixed
threshold. I then increment a counter in an accumulator for each stat
violation for the duration of the window (300 seconds) along with some
other metadata associated with that stat that violated the threshold. If
there are violations, then I want to process the window by serializing its
contents to JSON and publishing to Kinesis. What I want to do is NOT
serialize a window that has NO violations in its accumulator. There is no
need to send a message when no bad conditions are observed.

   - Could you please tell me how I can just throw away a window and NOT
   serialize it when it is ready to be processed?
   - How do I hook into some event that allows me to do that?
   - Do I need to implement a ProcessKeyedWindowFunction along with my
   AggregateFunction and somehow handle this as part of the process window
   function?

I have created a class that implements SerializationSchema to do that
serialization but the serialize() function requires a valid JSON returned
byte[]. I think the solution is somewhere else where I can elect to NOT
process the window at all and thereby serialize() will NOT get called.

Thank you


Guidance on general design approach for Flink

2024-01-30 Thread Mark Petronic
I am working on a new application to perform real time anomaly detection
using Flink. I am relatively new to Flink but have already one application
in production that is fairly basic and of low throughput. This next one
will be more complex and much higher throughput.

My query is about handling late arriving data. For this application, the
source data is like this:

   - zip files containing a single JSON file each are pushed to an S3
   bucket by many servers that I need to monitor where N servers are in M pools
   - An SQS event then is published from S3 for each new zip file that
   arrives
   - I write a Flink source to read the event and pull the S3 file as it
   arrives, stream unzip, deserialize the JSON, flat map its contents into a
   datastream, and then process the data in tumbling 60 second windows

Each file can contain up to either 300 seconds worth of metrics or 1000
time series records. When a server is processing a lot of connections, the
files grow faster so the JSON file is closed as soon at the1000
sample threshold hits. When a server's traffic is low, it emits the file
when the 300 second elapsed time threshold hits, regardless of how many
samples are in the file (so the file will have between 0 <= samples <=
1000) in this case.

It is this pattern that I am struggling with. I need to use one-minute
tumbling windows to aggregate these metrics. However, I may have to wait
300 seconds for the slow traffic file to be uploaded to S3 while other
files (on higher traffic servers) are showing up in S3 maybe every 10 or 20
seconds (each filled with 1000 samples that trigger the file closure and
update). Any of these files can contain a portion of the data that
would align into the same 60 second time window. All of the data from all
these servers needs to be aggregated and grouped as the servers are part of
pools of servers so I need to group by the records from each POOL and not
by just the records from each server.

So, my questions given that context are:

   1. Is having a 60 second tumbling window with 300 seconds of allowed
   lateness a pattern that is fairly common and typically implemented in Flink
   - where the lateness is not a fraction of the window size but a multiple of
   it? My sense is that this IS a reasonable problem that Flink can deal with.
   2. I believe with such an approach, there will potentially be 5-6
   60-second windows in flight for each grouping key to accommodate such
   allowed lateness and that means a lot more resources/state for the cluster
   to support. This impacts the resources needed for a given cluster scale. Do
   I have that assumption correct?

I just want to make sure I am considering the right tool for this job and
appreciate any inputs on this.


Combining multiple stages into a multi-stage processing pipeline

2024-04-06 Thread Mark Petronic
I am looking for some design advice for a new Flink application and I am
relatively new to Flink - I have one, fairly straightforward Flink
application in production so far.

For this new application, I want to create a three-stage processing
pipeline. Functionally, I am seeing this as ONE long datastream. But, I
have to evaluate the STAGE-1 data in a special manner to then pass on that
evaluation to STAGE-2 where it will do its own special evaluation using the
STAGE-1 evaluation results to shape its evaluation. The same thing happens
again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end
result is published to Kafka. The stages functionally look like this:

STAGE-1
KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn =>
SideOutput-1 |=> SessionWindow1 => ProcessWindowFn =>
(SideOutput-2[WindowRecords], KafkaSink[EvalResult])
|=> WindowAll => ProcessWindowFn =>
SideOutput-1 ^

STAGE-2
SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn =>
SideOutput-3 => SessionWindow2 => ProcessWindowFn =>
(SideOutput-4[WindowRecords], KafkaSink[EvalResult])

STAGE-3
SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn =>
SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink

DESCRIPTION

In STAGE-1, there are a fixed number of known keys so I will only see at
most about 21 distinct keys and therefore up to 21 tumbling one-minute
windows. I also need to aggregate all data in a global window to get
an overall non-keyed result. I need to bring the 21 results from those 21
tumbling windows AND the one global result into one place where I can
compare each of the 21 windows results to the one global result. Based on
this evaluation, only some of the 21 windows results will survive that
test. I want to then take the data records from those, say 3 surviving
windows, and make them the "source" for STAGE-2 processing as well as
publish some intermediate evaluation results to a KafkaSink. STAGE-2 will
reprocess the same data records that the three STAGE-1 surviving windows
processed, only keying them by different dimensions. I expect there to be
around 4000 fairly small records per each of the 21 STAGE-1 windows so, in
this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2
to form the new "source" datastream for STAGE-2.

Where I am struggling is:

   1. Trying to figure out how to best connect the output of the 21 STAGE-1
   windows and the one WIndowAll window records into a single point (I propose
   SessionWindow1) to be able to compare each of the 21 windows data results
   with the WindowAll non-keyed results.
   2. The best way to connect together these multiple stages.

Looking at the STAGE-1 approach illustrated above, this is my attempt at an
approach using side outputs to:

   1. Form a new "source" data stream that contains the outputs of each of
   the 21 windows and the WindowAll data
   2. Consume that into a single session window
   3. Do the evaluations between the 21 keyed windows against the overall
   WindowAll data
   4. Then emit only the 3 surviving sets of data from the 3 tumbling
   windows outputs from the ProcessWindowFn to SideOutput-2 and the
   evaluation results to Kafka
   5. Finally, SideOutput-2 will then form the new data stream "source" for
   STAGE-2 where a similar process will repeat, passing data to a STAGE-3,
   again similar processing, to finally obtain the desired result that will be
   published to Kafka.

I would greatly appreciate the following:

   1. Comments on if this is a valid approach - am I on the right track
   here?
   2. Could you suggest an alternate approach that I could investigate if
   this is problematic?.

I am trying to build a Flink application that follows intended best
practices so I am just looking for some confirmation that I am heading down
a reasonable path for this design.

Thank you in advance,
Mark


Re: Combining multiple stages into a multi-stage processing pipeline

2024-04-07 Thread Mark Petronic
Thank you Yunfeng. Your comments gave me some insights to explore how to
use consecutive windows. So, I coded up a version that looks like this and
works well for me:

KafkaSource => Keyby => TumblingWindows => ProcessWindowFn => WindowAll =>
ProcessWindowFn => (Here I will repeated keyed and windowall in addition
stages)

The missing connection for me was not understanding that I could connect
windows to windows in the same data stream. That understanding made all the
difference. So the now the keyed tumbling windows for the 21 keys each
process N records per key and create a score over that data and output a
POJO containing the score and a List. Then the WindowAll gets
those 21 POJOs of N records and iterates over all 21 * N records to
calculate the overall score. Now that it has in hand the overall score and
the 21 keyed scores from the prior windows, it can compare each of the 21
scores to the overall score and conditionally out.collect() only the
List for the record sets below threshold. Then, subsequent stages
can rinse and repeat this process in one clean job graph.

Thanks again for you thoughts. They really helped light the light bulb for
me :)
Mark


On Sat, Apr 6, 2024 at 11:24 PM Yunfeng Zhou 
wrote:

> Hi Mark,
>
> IMHO, your design of the Flink application is generally feasible. In
> Flink ML, I have once met a similar design in ChiSqTest operator,
> where the input data is first aggregated to generate some results and
> then broadcast and connected with other result streams from the same
> input afterwards. You may refer to this algorithm for more details
> when designing your applications.
>
> https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/main/java/org/apache/flink/ml/stats/chisqtest/ChiSqTest.java
>
> Besides, side outputs are typically used when you want to split an
> output stream into different categories. Given that the
> ProcessWindowFn before each SideOutput-x only has one downstream, it
> would be enough to directly pass the resulting DataStream to session
> windows instead of introducing side outputs.
>
> Best,
> Yunfeng
>
> On Sun, Apr 7, 2024 at 12:41 AM Mark Petronic 
> wrote:
> >
> > I am looking for some design advice for a new Flink application and I am
> relatively new to Flink - I have one, fairly straightforward Flink
> application in production so far.
> >
> > For this new application, I want to create a three-stage processing
> pipeline. Functionally, I am seeing this as ONE long datastream. But, I
> have to evaluate the STAGE-1 data in a special manner to then pass on that
> evaluation to STAGE-2 where it will do its own special evaluation using the
> STAGE-1 evaluation results to shape its evaluation. The same thing happens
> again in STAGE-3, using the STAGE-2 evaluation results. Finally, the end
> result is published to Kafka. The stages functionally look like this:
> >
> > STAGE-1
> > KafkaSource |=> Keyby => TumblingWindows1 => ProcessWindowFn =>
> SideOutput-1 |=> SessionWindow1 => ProcessWindowFn =>
> (SideOutput-2[WindowRecords], KafkaSink[EvalResult])
> > |=> WindowAll => ProcessWindowFn =>
> SideOutput-1 ^
> >
> > STAGE-2
> > SideOutput-2 => Keyby => TumblingWindows2 => ProcessWindowFn =>
> SideOutput-3 => SessionWindow2 => ProcessWindowFn =>
> (SideOutput-4[WindowRecords], KafkaSink[EvalResult])
> >
> > STAGE-3
> > SideOutput-4 => Keyby => TumblingWindows3 => ProcessWindowFn =>
> SideOutput-5 => SessionWindow3 => ProcessWindowFn => KafkaSink
> >
> > DESCRIPTION
> >
> > In STAGE-1, there are a fixed number of known keys so I will only see at
> most about 21 distinct keys and therefore up to 21 tumbling one-minute
> windows. I also need to aggregate all data in a global window to get an
> overall non-keyed result. I need to bring the 21 results from those 21
> tumbling windows AND the one global result into one place where I can
> compare each of the 21 windows results to the one global result. Based on
> this evaluation, only some of the 21 windows results will survive that
> test. I want to then take the data records from those, say 3 surviving
> windows, and make them the "source" for STAGE-2 processing as well as
> publish some intermediate evaluation results to a KafkaSink. STAGE-2 will
> reprocess the same data records that the three STAGE-1 surviving windows
> processed, only keying them by different dimensions. I expect there to be
> around 4000 fairly small records per each of the 21 STAGE-1 windows so, in
> this example, I would be sending 4000 x 3 = 12000 records in SideOutput-2
> to form the new "source" datastream for STAGE-2.
> >
>

Unsubscribe

2024-05-12 Thread Mark Petronic