Re: Flink MiniCluster: Multiple Task Managers

2022-05-14 Thread Roman Grebennikov
Hey Κωνσταντίνος,

check out this sample code we use for testing 
https://github.com/metarank/metarank . It is in scala, but should be quite 
straightforward to port to java:

val cluster = new MiniClusterWithClientResource(
  new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1) // here it is
.setNumberSlotsPerTaskManager(1)
.setConfiguration(conf)
.build()
)
cluster.before()
val client = cluster.getClusterClient
val env = new StreamExecutionEnvironment(new 
TestStreamEnvironment(cluster.getMiniCluster, 1))
// do some stuff with env
val graph = env.getStreamGraph.getJobGraph
client.submitJob(graph)
// here you need to wait until the job is finished
// you may use cluster client instance here to poll for completion
client.close()
cluster.after()

For a full working example, check these files:
* 
https://github.com/metarank/metarank/blob/master/src/main/scala/ai/metarank/mode/AsyncFlinkJob.scala
* 
https://github.com/metarank/metarank/blob/master/src/main/scala/ai/metarank/mode/inference/FlinkMinicluster.scala

with best regards,
Roman Grebennikov | g...@dfdx.me

On Sat, May 14, 2022, at 18:45,  Αγαπίδης wrote:
> Hi list,
>
> I am using Java 8, Flink 1.15, and IntelliJ.
>
> I wonder if it is posible to open an additional TaskManager in a Stream 
> Job in the Java code, to run it in IntelliJ Local Cluster (MiniCluster) 
> Debug mode. I found this method in the code reference, but I don't know 
> hot to call it.
>
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html#startTaskManager--
>
> I thought that I need to get the current MiniCluster somehow, before 
> starting the Job Excecution. But how?
>
> Thank you in advance!
>
> --
> Best Regards,
> Kostas


Re: Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Roman Grebennikov
Hi,

AFAIK scala REPL was removed completely in Flink 1.15 
(https://issues.apache.org/jira/browse/FLINK-24360), so there is nothing to 
cross-build.

Roman Grebennikov | g...@dfdx.me


On Thu, May 12, 2022, at 14:55, Jeff Zhang wrote:
> Great work Roman, do you think it is possible to run in scala shell as well?
> 
> On Thu, May 12, 2022 at 10:43 PM Roman Grebennikov  wrote:
>> __
>> Hello,
>> 
>> As far as I understand discussions in this mailist, now there is almost no 
>> people maintaining the official Scala API in Apache Flink. Due to some 
>> technical complexities it will be probably stuck for a very long time on 
>> Scala 2.12 (which is not EOL yet, but quite close to):
>> * Traversable serializer relies a lot on CanBuildFrom (so it's read and 
>> compiled on restore), which is missing in Scala 2.13 and 3.x - migrating off 
>> from this approach maintaining a savepoint compatibility can be quite a 
>> complex task.
>> * Scala API uses an implicitly generated TypeInformation, which is generated 
>> by a giant scary mkTypeInfo macro, which should be completely rewritten for 
>> Scala 3.x.
>> 
>> But even in the current state, scala support in Flink has some issues with 
>> ADT (sealed traits, popular data modelling pattern) not being natively 
>> supported, so if you use them, you have to fall back to Kryo, which is not 
>> that fast: we've seed 3x-4x throughput drops in performance tests.
>> 
>> In my current company we made a library 
>> (https://github.com/findify/flink-adt) which used Magnolia 
>> (https://github.com/softwaremill/magnolia) to do all the compile-time 
>> TypeInformation generation to make Scala ADT nice & fast in Flink. With a 
>> couple of community contributions it was now possible to cross-build it also 
>> for scala3.
>> 
>> As Flink 1.15 core is scala free, we extracted the DataStream part of Flink 
>> Scala API into a separate project, glued it together with flink-adt and 
>> ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and 
>> cross-compiled it for 2.12/2.13/3.x. You can check out the result on this 
>> github project: https://github.com/findify/flink-scala-api
>> 
>> So technically speaking, now it's possible to migrate a scala flink job from 
>> 2.12 to 3.x with:
>> * replace flink-streaming-scala dependency with flink-scala-api (optional, 
>> both libs can co-exist in classpath on 2.12)
>> * replace all imports of org.apache.flink.streaming.api.scala._ with ones 
>> from the new library
>> * rebuild the job for 3.x
>> 
>> The main drawback is that there is no savepoint compatibility due to 
>> CanBuildFrom and different way of handling ADTs. But if you can afford 
>> re-bootstrapping the state - migration is quite straightforward.
>> 
>> The README on github https://github.com/findify/flink-scala-api#readme has 
>> some more details on how and why this project was done in this way. And the 
>> project is a bit experimental, so if you're interested in scala3 on Flink, 
>> you're welcome to share your feedback and ideas. 
>> 
>> with best regards,
>> Roman Grebennikov | g...@dfdx.me
>> 
> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang


Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Roman Grebennikov
Hello,

As far as I understand discussions in this mailist, now there is almost no 
people maintaining the official Scala API in Apache Flink. Due to some 
technical complexities it will be probably stuck for a very long time on Scala 
2.12 (which is not EOL yet, but quite close to):
* Traversable serializer relies a lot on CanBuildFrom (so it's read and 
compiled on restore), which is missing in Scala 2.13 and 3.x - migrating off 
from this approach maintaining a savepoint compatibility can be quite a complex 
task.
* Scala API uses an implicitly generated TypeInformation, which is generated by 
a giant scary mkTypeInfo macro, which should be completely rewritten for Scala 
3.x.

But even in the current state, scala support in Flink has some issues with ADT 
(sealed traits, popular data modelling pattern) not being natively supported, 
so if you use them, you have to fall back to Kryo, which is not that fast: 
we've seed 3x-4x throughput drops in performance tests.

In my current company we made a library (https://github.com/findify/flink-adt) 
which used Magnolia (https://github.com/softwaremill/magnolia) to do all the 
compile-time TypeInformation generation to make Scala ADT nice & fast in Flink. 
With a couple of community contributions it was now possible to cross-build it 
also for scala3.

As Flink 1.15 core is scala free, we extracted the DataStream part of Flink 
Scala API into a separate project, glued it together with flink-adt and 
ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and 
cross-compiled it for 2.12/2.13/3.x. You can check out the result on this 
github project: https://github.com/findify/flink-scala-api

So technically speaking, now it's possible to migrate a scala flink job from 
2.12 to 3.x with:
* replace flink-streaming-scala dependency with flink-scala-api (optional, both 
libs can co-exist in classpath on 2.12)
* replace all imports of org.apache.flink.streaming.api.scala._ with ones from 
the new library
* rebuild the job for 3.x

The main drawback is that there is no savepoint compatibility due to 
CanBuildFrom and different way of handling ADTs. But if you can afford 
re-bootstrapping the state - migration is quite straightforward.

The README on github https://github.com/findify/flink-scala-api#readme has some 
more details on how and why this project was done in this way. And the project 
is a bit experimental, so if you're interested in scala3 on Flink, you're 
welcome to share your feedback and ideas. 

with best regards,
Roman Grebennikov | g...@dfdx.me


Re: Practical guidance with Scala and Flink >= 1.15

2022-05-11 Thread Roman Grebennikov
As a yet another attempt to make Flink work with scala 2.13/3.x, we went 
further and and cross-built a forked version of Flink's Scala API: 
https://github.com/findify/flink-scala-api 
<https://github.com/findify/flink-scala-api.>
Check the github repo for details, but if you can afford re-bootstrapping your 
job state from scratch (as there is no savepoint compatibility, see the repo 
readme for details), then migration is quite a straightforward process. Main 
features:

 * Cross-built for Scala 2.12, 2.13 and 3.x
 * 100% replica of the 1.15 Scala API, with deprecated methods removed.
 * ClosureCleaner from Spark 3.x with 2.13 and java 17 support.
 * Agnostic to the serialization format, but can use flink-adt to support scala 
more natively (with sealed traits and scala3 typeclasses instead of scary 
macro).
It's still an experimental version which is not yet well tested, but I don't 
see any possible dangers in it:

 * we're using flink-adt exclusively for a lot of Flink jobs in Findify for 
almost a year with no issues so far.
 * Flink's Scala API is a thin wrapper on top of Java API, so there is nothing 
to break.
The only not well-tested thing is scala3 derivation, which is a compile-time 
process. So if you have compilation errors due to flink-adt failing for your 
case classes, you're welcome to submit a bug report on github.


with best regards,
Roman Grebennikov | g...@dfdx.me


On Wed, May 11, 2022, at 07:38, Ran Tao wrote:
> Hi, guys. I posted a jdk11 & jdk17 issue [FLINK-27549] 
> <https://issues.apache.org/jira/browse/FLINK-27549> recently which involved 
> upgrading scala [2] of current discussion.
> 
> It shows that the current flink project is not a completed or pure jdk11 
> version.(same problem with higher version). because the generated scala 
> bytecode <=1.8 can't generate target-11 or higher bytecode version.  If scala 
> upgrade to 2.13 will fix this problem. 
> 
> Hope it helps~
> 
> [1] https://lists.apache.org/list?d...@flink.apache.org:lte=1M:Ran%20Tao
> 
> Martijn Visser  于2022年5月11日周三 15:10写道:
>> Hi Matthias,
>> 
>> Given the current state of Scala support in the Flink community (there is a 
>> major lack in Scala maintainers), it is my personal opinion that we should 
>> consider deprecating the current Scala APIs and replace those with new Scala 
>> APIs, which are 'just' wrappers for the Java API. This definitely needs a 
>> FLIP and a discussion, where I can explain the situation more. There's 
>> nothing concrete for this yet though. There have been some discussions on 
>> this topic going on in the ticket for adding Scala 2.13 support [1]
>> 
>> Best regards,
>> 
>> Martijn
>> 
>> [1] 
>> https://issues.apache.org/jira/browse/FLINK-13414?focusedCommentId=17344555&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17344555
>> 
>> On Tue, 10 May 2022 at 16:16, Schwalbe Matthias 
>>  wrote:
>>> … just for my understanding
>>> __ __
>>> From the announcements I only got that scala remains only a dependency in 
>>> the JARs that relate to the Scala API.
>>> I never read about plans to drop the Scala API altogether … is that the 
>>> case??
>>> That would be very unfortunate …
>>> __ __
>>> What is the state of the affair?
>>> __ __
>>> Best regards
>>> __ __
>>> Thias
>>> __ __
>>> __ __
>>> __ __
>>> *From:* Martijn Visser  
>>> *Sent:* Monday, May 9, 2022 2:38 PM
>>> *To:* Robert Metzger 
>>> *Cc:* Salva Alcántara ; user 
>>> 
>>> *Subject:* Re: Practical guidance with Scala and Flink >= 1.15
>>> __ __
>>> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>>> __ __
>>> Hi Salva,
>>> __ __
>>> Like Robert said, I don't expect that we will be able to drop support for 
>>> Scala 2.12 anytime soon. I do think that we should have a discussion in the 
>>> Flink community about providing Scala APIs. My opinion is that we are 
>>> probably better off to deprecate the current Scala APIs (keeping it 
>>> internal as we still have a big piece of Scala internally) and only offer 
>>> Java APIs. The Flink community lacks real Scala maintainers. I think Seth's 
>>> blog is pretty spot-on on this too [1].
>>> __ __
>>> Best regards,
>>> __ __
>>> Martijn Visser
>>> https://twitter.com/MartijnVisser82
>>> https://github.com/MartijnVisser
>>> __ __
>>> [1] https://flink.apache.o

Re: Source with S3 bucket with millions ( billions ) of object ( small files )

2022-04-04 Thread Roman Grebennikov
Hi,

in a unified stream/batch FileSource there is a processStaticFileSet() method 
to enumerate all the splits only once,
and make Source complete when it's finished. 

As for my own experience using the processStaticFileSet with large s3 buckets, 
the enumeration seems to happen on the jobmanager, and listing bucket with a 1B 
items will probably block it for a long time - this happens on 1.14, not sure 
how it's going on an upcoming 1.15.

with best regards,
Roman Grebennikov | g...@dfdx.me


On Sun, Apr 3, 2022, at 22:07, Vishal Santoshi wrote:
> Folks,  
> I am doing a simple batch job that uses readFile() with 
> "s3a://[bucket_name]" as the path with setNestedFileEnumeration(true). I am a 
> little curious about a few things. 
> 
> In batch mode which I think is turned on by FileProcessingMode.PROCESS_ONCE 
> mode does the source list all the S3 objects in the bucket to create input 
> splits *before* it calls downstream operators ? 
> 
> 
> 
> 
> Thanks.
> 
> 
> 
> 
> 


Re: Scala Case Class Serialization

2021-12-07 Thread Roman Grebennikov
Hi,

I guess the problematic line where the kryo fallback is happening is here:

  lazy val myState: MapState[String, TestCaseClass] = 
getRuntimeContext.getMapState(
new MapStateDescriptor[String, TestCaseClass]("test-state", 
classOf[String], ttestclass.getTypeClass)
 

MapStateDescriptor has multiple constructors, some of them do have strong java 
smell :)
The one you've used here with classOf[String] - is passing a class instance 
inside of the java constructor, and the constructor implicitly uses java 
typeinformation derivation under the hood, which has no idea about scala.

MapStateDescriptor also has another constructor, which can take the explicit 
TypeInformation for key and value, like this:
val keyTypeInfo = createTypeInformation[String]
val valueTypeInfo = createTypeInformation[TestCaseClass]
new MapStateDescriptor[String,TestCaseClass]("test", keyTypeInfo, valueTypeInfo)

then it won't try to behave too smart, won't try to derive typeinfo for 
Class[_] and will use the one you provided.

with best regards,
Roman Grebennikov | g...@dfdx.me


On Tue, Dec 7, 2021, at 19:05, Lars Skjærven wrote:
> Thanks for quick response. Please find attached a minimal example 
> illustrating the issue. I've added implicit TypeInformation, and checked that 
> I'm importing the scala variant only. 
> 
> Matthias: Just my superficial impression from [1]. Will look into 
> TypeInfoFactory. 
> 
> Thanks again!
> 
> package com.mystuff
> import org.apache.flink.api.common.functions.RichFlatMapFunction
> import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
> import org.apache.flink.api.common.typeinfo.{TypeInformation}
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.util.Collector
> 
> case class TestCaseClass(id: String, pos: Int)
> 
> class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] {
>   implicit val ttestclass: TypeInformation[TestCaseClass] = 
> createTypeInformation[TestCaseClass]
> 
>   lazy val myState: MapState[String, TestCaseClass] = 
> getRuntimeContext.getMapState(
> new MapStateDescriptor[String, TestCaseClass]("test-state", 
> classOf[String], ttestclass.getTypeClass)
>   )
> 
>   override def flatMap(value: TestCaseClass, out: Collector[String]): Unit = {
> myState.put(value.id, value)
> myState.get(value.id)
> out.collect(value.id)
>   }
> }
> 
> object TestJob {
> 
>   def main(args: Array[String]): Unit = {
> 
> val env = StreamExecutionEnvironment.createLocalEnvironment()
> env.getConfig.disableGenericTypes()
> 
> val s = Seq[TestCaseClass](
>   TestCaseClass(id = "1", pos = 1),
>   TestCaseClass(id = "2", pos = 2),
>   TestCaseClass(id = "3", pos = 3),
> )
> 
> env
>   .fromCollection[TestCaseClass](s)
>   .keyBy(s => s.id)
>   .flatMap(new MyRichFlatMap)
>   .print()
> 
>     env.execute("Test Job")
>   }
> }
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
> 
> On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov  wrote:
>> __
>> Hi Lars,
>> 
>> can you please show a small reproducer of the way you construct the 
>> DataStream, and which imports do you use?
>> 
>> We also often experience similar performance issues with scala, but usually 
>> they are related to accidental usage of Flink Java API. A couple of hints 
>> from my experience:
>> 1. Make sure that you always use the scala DataStream, and not the java one.
>> 2. All operations on scala datastream require an implicit TypeInformation[T] 
>> parameter, which is usually generated automatically for you if you do an 
>> "import org.apache.flink.api.scala._" by the createTypeInformation[T] macro. 
>> So make sure you have this import present.
>> 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw an 
>> exception each time it have to fall back to generic kryo serialization. 
>> Backtrace will highlight you an exact place in your code where it have to do 
>> a kryo fallback.
>> 
>> Also Flink will always revert to Kryo in case if you use sum types (or ADTs, 
>> or "sealed traits"). Shameless plug: we made a library to support that: 
>> https://github.com/findify/flink-adt
>> 
>> Roman Grebennikov | g...@dfdx.me
>> 
>> 
>> On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
>>> Hi Lars,
>>> not sure about the out-

Re: use of Scala versions >= 2.13 in Flink 1.15

2021-12-07 Thread Roman Grebennikov
Hi,

I guess using scala 2.13 with scala-free Flink 1.15 assumes that it will always 
use generic/Kryo serialization, which has a large performance penalty (YMMV, 
but it happens all the time with us when we accidentaly use flink java apis 
with scala case classes).

As far as I know, Flink's set of scala serializers for collections is using 
some 2.11/2.12 specific deprecated internal things like CanBuildFrom, which are 
not available on 2.13. So implementing a state migration from 2.12 to 2.13 is 
not that easy due to a way flink TraversableSerializer is implemented. And 
createTypeInformation scala macro flink is using for deriving serializers for 
scala case classes is not directly compatible with 3.0, as there is a 
completely new scala macro API on 3.x.

Chesnay, I'm wondering what is the plan on 2.13/3.0 support in the future?

If I was the one writing a FLIP for this process, I can imagine it like this:
* as 2.11 is finally removed in 1.15, the createTypeInformation macro can be 
re-done on top of magnolia, which supports 2.12, 2.13 and 3.x with the same API.
* current impementation of flink's serializers for scala collections (afaik in 
TraversableSerializer) is serializing the whole CanBuildFrom code for a 
specific concrete collection type right in the snapshot. So it cannot be 
deserialized on 2.13, as there is no CanBuildFrom. But my own opinion is that 
the cases when someone has custom CanBuildFrom for their own hand-made scala 
collection implementation is extremely rare, so with a set of heuristics we can 
guess the concrete collection type right from the serialized CanBuildFrom scala 
code, assuming that there is finite number of collection types (around 10 or 
something).

With this approach we can: support 2.12/2.13/3.x with the same codebase, and 
allow state migrations between scala versions.

I did some sort of prototype for step 1 (and partially step 2) in 
https://github.com/findify/flink-adt , although with a different goal of 
supporting scala ADTs, so if anyone interested, I can make a draft FLIP 
proposal based on this research to start the discussion.

with best regards,
Roman Grebennikov | g...@dfdx.me

On Tue, Dec 7, 2021, at 08:46, Chesnay Schepler wrote:
> We haven't changed anything significant in 1.14.
>
> Whether the 2.13 job can run on Scala 2.12 depends a bit on the job (and 
> of course, used libraries!); it depends on the backwards-compatibility 
> from Scala, which APIs are used and what kind of Scala magic is being 
> employed.
> We haven't really tested that scenario in 1.14 or below.
>
> On 07/12/2021 09:28, guenterh.lists wrote:
>> Hi Chesnay,
>>
>> thanks for the info - this is really good news for us.
>>
>> I set up a playground using the snapshot from yesterday [1] and a 
>> really quick and short Job using Scala 2.13 [2]
>>
>> The job starts and returns correct results. Even the use of a case 
>> class against the Java API is possible.
>>
>> Then I made a second try with the same job (compiled with Scala 
>> 2.13.6) running on a Flink 1.14 cluster which was again successful.
>>
>> My question:
>> Is this compilation with Scala versions >=2.13 already part of 1.14 or 
>> is my example too small and simple that binary incompatibilities 
>> between the versions doesn't matter?
>>
>> Günter
>>
>>
>> [1] 
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
>> [2] 
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8
>>  
>>
>>
>>
>> On 06.12.21 13:59, Chesnay Schepler wrote:
>>> With regards to the Java APIs, you will definitely be able to use the 
>>> Java DataSet/DataStream APIs from Scala without any restrictions 
>>> imposed by Flink. This is already working with the current SNAPSHOT 
>>> version.
>>>
>>> As we speak we are also working to achieve the same for the Table 
>>> API; we expect to achieve that but with some caveats (i.e., if you 
>>> use the Python API or the Hive connector then you still need to use 
>>> the Scala version provided by Flink).
>>>
>>> As for the Scala APIs, we haven't really decided yet how this will 
>>> work in the future. However, one of the big benefits of the 
>>> Scala-free runtime is that it should now be easier for us to release 
>>> the APIs for more Scala versions.
>>>
>>> On 06/12/2021 11:47, guenterh.lists wrote:
>>>> Dear list,
>>>>
>>>> there have been some discussions and activities in the la

Re: Scala Case Class Serialization

2021-12-07 Thread Roman Grebennikov
Hi Lars,

can you please show a small reproducer of the way you construct the DataStream, 
and which imports do you use?

We also often experience similar performance issues with scala, but usually 
they are related to accidental usage of Flink Java API. A couple of hints from 
my experience:
1. Make sure that you always use the scala DataStream, and not the java one.
2. All operations on scala datastream require an implicit TypeInformation[T] 
parameter, which is usually generated automatically for you if you do an 
"import org.apache.flink.api.scala._" by the createTypeInformation[T] macro. So 
make sure you have this import present.
3. You can do a "env.getConfig.disableGenericTypes" and flink will throw an 
exception each time it have to fall back to generic kryo serialization. 
Backtrace will highlight you an exact place in your code where it have to do a 
kryo fallback.

Also Flink will always revert to Kryo in case if you use sum types (or ADTs, or 
"sealed traits"). Shameless plug: we made a library to support that: 
https://github.com/findify/flink-adt

Roman Grebennikov | g...@dfdx.me


On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
> Hi Lars,
> not sure about the out-of-the-box support for case classes with primitive 
> member types (could you refer to the section which made you conclude this?). 
> I haven't used Scala with Flink, yet. So maybe, others can give more context.
> But have you looked into using the TypeInfoFactory to define the schema [1]?
> 
> Best,
> Matthias
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
> 
> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven  wrote:
>> Hello,
>> We're running Flink 1.14 with scala, and we're suspecting that performance 
>> is suffering due to serialization of some scala case classes. Specifically 
>> we're seeing that our Case Class "cannot be used as a POJO type because not 
>> all fields are valid POJO fields, and must be processed as GenericType", and 
>> that the case class "does not contain a setter for field X". I'm 
>> interpreting these log messages as performance warnings. 
>> 
>> A simple case class example we're writing to state that triggers the 
>> mentioned 'warnings': 
>> 
>> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>> 
>> I'm understanding the docs that case classes with primitive types should be 
>> supported "out of the box". 
>> 
>> Any tips on how to proceed ? 
>> 
>> Kind regards, 
>> Lars
> 


Re: Flink CPU load metrics in K8s

2020-08-04 Thread Roman Grebennikov
Hi,

JVM.CPU.Load is just a wrapper (MetricUtils.instantiateCPUMetrics) on top of 
OperatingSystemMXBean.getProcessCpuLoad (see 
https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad())

Usually it looks weird if you have multiple CPU cores. For example, if you have 
a job with a single slot 100% utilizing a single CPU core on a 8 core machine, 
the JVM.CPU.Load will be 1.0/8.0 = 0.125. It's also a point-in-time snapshot of 
current CPU usage, so if you're collecting your metrics every minute, and the 
job has spiky workload within this minute (like it's idle almost always and 
once in a minute it consumes 100% CPU for one second), so you have a chance to 
completely miss this from the metrics.

As for me personally, JVM.CPU.Time is more clear indicator of CPU usage, which 
is always increasing amount of milliseconds CPU spent executing your code. And 
it will also catch CPU usage spikes.

Roman Grebennikov | g...@dfdx.me


On Mon, Aug 3, 2020, at 23:34, Bajaj, Abhinav wrote:
> Hi,

>  

> I am trying to understand the CPU Load metrics reported by Flink 1.7.1 
> running with openjdk 1.8.0_212 on K8s.

>  

> After deploying the Flink Job on K8s, I tried to get CPU Load metrics 
> following this documentation 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#rest-api-integration>.

> curl 
> localhost:8081/taskmanagers/7737ac33b311ea0a696422680711597b/metrics?get=Status.JVM.CPU.Load,Status.JVM.CPU.Time

> [{"id":"Status.JVM.CPU.Load","value":"0.0023815194093831865"},{"id":"Status.JVM.CPU.Time","value":"2326000"}]

>  

> The value of the CPU load looks odd to me.

>  

> What is the unit and scale of this value? 

> How does Flink determine this value?

>  

> Appreciate your time and help here.

> ~ Abhinav Bajaj 

>  



Re: Running Apache Flink on the GraalVM as a Native Image

2020-06-30 Thread Roman Grebennikov
Hi,

the error it the original message means that inside a Kryo serializer, there is 
something that uses sun.misc.Unsafe to compute field offset in raw memory to 
access class fields. The problem is that memory layout in JVM and in NI is 
different and most probably will result in a segfault if you leave it as is.

If you really want to go further, there is a @RecomputeFieldValue(kind = 
Kind.FieldOffset) annotation in the graalvm to update the offset for NI.

But I guess the overall perspective of running Flink as a NI is not that good 
for the following reasons:
* GraalVM NI requires that the whole application is completely static and there 
is no dynamic classloading happening in runtime. NI still can handle some sort 
of dynamic classloading, but only if you define all possible combinations of 
loaded classes on compile time. Flink itself uses dynamic classloading when you 
submit a job there as a fat jar file. So you will need to statically compile a 
bundle of your app fat jar AND taskmanager code at once into a single binary.
* Flink also relies quite a lot on dynamic reflection in the serialization 
code, so you also have to build static reflection configuration at compile-time 
(by probably running your job+taskmanager with tracing agent on JVM)
* As GraalVM NI has no way of doing JIT compilation (as everything must be 
compiled statically), you should expect a lower overall job performance.

GraalVM and Flink can be quite a nice code gymnastics exercise, but what is 
your final business goal?

Roman Grebennikov | g...@dfdx.me


On Thu, Jun 25, 2020, at 11:48, ivo.kn...@t-online.de wrote:
> Whats up guys,

> 

> I'm trying to run an Apache Flink Application with the GraalVM Native Image 
> but I get the following error: (check attached file)

> 

> I suppose this happens, because Flink uses a lot of low-level-code and is 
> highly optimized.

> 

> When I googled the combination of GraalVM Native Image and Apache Flink I get 
> no results.

> 

> Did anyone ever succeeded in making it work and how?

> 

> Best regards,

> 

> Ivo

> 
> 
> *Attachments:*
>  * Flink GraalVM Native Image Error


Re: Kinesis ProvisionedThroughputExceededException

2020-06-17 Thread Roman Grebennikov
Hi,

It will occur if your job will reach SHARD_GETRECORDS_RETRIES consecutive 
failed attempts to pull the data from kinesis.
So if you scale up the topic in kinesis and tune a bit backoff parameters, you 
will lower the probability of this exception almost to zero (but with increased 
costs and worst-case latency).

But yes, this is a main drawback of managed solutions - as far as you reach a 
significant load, you need to pay more. Other managed option within AWS is to 
switch to MSK, managed Kafka, which has no such significant restrictions.

And the final option is to wait until FLINK-17688 
<https://issues.apache.org/jira/browse/FLINK-17688> will be implemented (using 
Kinesis enhanced fan-out, so Kinesis will push the data to consumer, instead of 
consumer periodically pulling the data).

Roman Grebennikov | g...@dfdx.me


On Wed, Jun 17, 2020, at 04:39, M Singh wrote:
> 
> 
> Thanks Roman for your response and advice.
> 
> From my understanding increasing shards will increase throughput but still if 
> more than 5 requests are made per shard/per second, and since we have 20 apps 
> (and increasing) then the exception might occur. 
> 
> Please let me know if I have missed anything.
> 
> Mans
> On Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov  
> wrote:
> 
> 
> Hi, 
> 
> usually this exception is thrown by aws-java-sdk and means that your kinesis 
> stream is hitting a throughput limit (what a surprise). We experienced the 
> same thing when we had a single "event-bus" style stream and multiple flink 
> apps reading from it.
> 
> Each Kinesis partition has a limit of 5 poll operations per second. If you 
> have a stream with 4 partitions and 30 jobs reading from it, I guess that 
> each job is constantly hitting op limit for kinesis with default kinesis 
> consumer settings and it does an exponential back-off (by just sleeping for a 
> small period of time and then retrying).
> 
> You have two options here:
> 1. scale up the kinesis stream, so there will be more partitions and higher 
> overall throughput limits
> 2. tune kinesis consumer backoff parameters:
> 
> Our current ones, for example, look like this:
> 
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
> // we poll every 2s
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
> in case of throughput error, initial timeout is 2s
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
> we can go up to 10s pause
>  
> conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
>  "1.5") // multiplying pause to 1.5 on each next step
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
> make up to 100 retries
> 
> with best regards,
> Roman Grebennikov | g...@dfdx.me
> 
> 
> On Mon, Jun 15, 2020, at 13:45, M Singh wrote:
>> Hi:
>> 
>> I am using multiple (almost 30 and growing) Flink streaming applications 
>> that read from the same kinesis stream and get 
>> ProvisionedThroughputExceededException exception which fails the job.
>> I have seen a reference 
>> 
>> http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
>>  - which indicates there might be some solution perhaps in Flink 1.8/1.9. 
>> 
>> I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due 
>> to ProvisionedThroughputExceededException - ASF JIRA 
>> <https://issues.apache.org/jira/browse/FLINK-10536> is still open.
>> 
>> 
>> So i wanted to find out 
>> 
>> 1. If this issue has been resolved and if so in which version ?
>> 2. Is there any kinesis consumer with kinesis fanout available that can help 
>> address this issue ?
>> 3. Is there any specific parameter in kinesis consumer config that can 
>> address this issue ?
>> 
>> If there is any other pointer/documentation/reference, please let me know.
>> 
>> Thanks
>> 
> 


Re: Kinesis ProvisionedThroughputExceededException

2020-06-16 Thread Roman Grebennikov
Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") // 
we poll every 2s
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // in 
case of throughput error, initial timeout is 2s
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // we 
can go up to 10s pause
 
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and make 
up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:
> Hi:
> 
> I am using multiple (almost 30 and growing) Flink streaming applications that 
> read from the same kinesis stream and get 
> ProvisionedThroughputExceededException exception which fails the job.
> I have seen a reference 
> http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
>  - which indicates there might be some solution perhaps in Flink 1.8/1.9. 
> 
> I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
> ProvisionedThroughputExceededException - ASF JIRA 
> <https://issues.apache.org/jira/browse/FLINK-10536> is still open.
> 
> 
> So i wanted to find out 
> 
> 1. If this issue has been resolved and if so in which version ?
> 2. Is there any kinesis consumer with kinesis fanout available that can help 
> address this issue ?
> 3. Is there any specific parameter in kinesis consumer config that can 
> address this issue ?
> 
> If there is any other pointer/documentation/reference, please let me know.
> 
> Thanks
> 


Re: StreamingFileSink

2019-10-14 Thread Roman Grebennikov
As for StreamingFileSink and compressed output, see the 
StreamingFileSink.forBulkFormat and BulkWriter.Factory classes. Simple example 
(using apache commons-io and commons-compress):

 val writer = new BulkWriter.Factory[String] {
 override def create(out: FSDataOutputStream): BulkWriter[String] = new 
BulkWriter[String] {
 val compressed = new GzipCompressorOutputStream(out)
 override def addElement(element: String): Unit = 
compressed.write(element.getBytes())
 override def flush(): Unit = compressed.flush()
 override def finish(): Unit = compressed.close()
 }
 }
 val sink = StreamingFileSink.forBulkFormat[String](new Path("/some/path"), 
writer)

There are still some usability issues with StreamingFileSink (like not being 
able to customize the resulting file names), but they are already going to be 
fixed in Flink 1.10.

Roman Grebennikov | g...@dfdx.me


On Fri, Oct 11, 2019, at 23:07, John O wrote:
> Hello,

> 

> Question 1

> I don’t see any reference material showing how to write compressed (gzip) 
> files with StreamingFileSink. Can someone point me in the right direction?

> 

> Question 2

> We currently have a use case for a “StreamingFileProcessFunction”. Basically 
> we need an output for the StreamingFileSink that will be used by a downstream 
> processor. What would be the best way to implement this feature?

> 

> 

> Best,

> Song



Re: Batch Job in a Flink 1.9 Standalone Cluster

2019-10-14 Thread Roman Grebennikov
Forced GC does not mean that JVM will even try to release the freed memory back 
to the operating system. This highly depends on the JVM and garbage collector 
used for your Flink setup, but most probably it's the jvm8 with the ParallelGC 
collector.

ParallelGC is known to be not that aggressive on releasing free heap memory 
back to OS. I see here multiple different solutions:
1. Question yourself why do you really need to release any memory back? Is 
there a logical reason behind it? As next time you resubmit the job, the memory 
is going to be reused.
2. You can switch to G1GC and use JVM args like "-XX:MaxHeapFreeRatio 
-XX:MinHeapFreeRatio" to make it more aggressive on releasing memory.
3. You can use unofficial JVM builds from RedHat with ShenandoahGC backport, 
which is also able to do the job: 
https://builds.shipilev.net/openjdk-shenandoah-jdk8/
3. Flink 1.10 (hopefully) will be able to run on jvm11, so G1 on it is much 
more aggressive on releasing memory: 
https://bugs.openjdk.java.net/browse/JDK-8146436

Roman Grebennikov | g...@dfdx.me


On Sat, Oct 12, 2019, at 08:38, Timothy Victor wrote:
> This part about the GC not cleaning up after the job finishes makes sense. 
> However, I o served that even after I run a "jcmd  GC.run" on the task 
> manager process ID the memory is still not released. This is what concerns me.
> 
> Tim
> 
> 
> On Sat, Oct 12, 2019, 2:53 AM Xintong Song  wrote:
>> Generally yes, with one slight difference. 
>> 
>> Once the job is done, the buffer is released by flink task manager (because 
>> pre-allocation is configured to be disabled), but the corresponding memory 
>> may not be released by jvm (because no GC cleans it). So it's not the task 
>> manager that keeps the buffer to be used for the next batch job. When the 
>> new batch job is running, the task executor allocates new buffers, which 
>> will use the memory of the previous buffer that jvm haven't released.
>> 
>> Thank you~

>> Xintong Song

>> 

>> 
>> 
>> On Sat, Oct 12, 2019 at 7:28 AM Timothy Victor  wrote:
>>> Thanks Xintong! In my case both of those parameters are set to false 
>>> (default). I think I am sort of following what's happening here.
>>> 
>>> I have one TM with heap size set to 1GB. When the cluster is started the TM 
>>> doesn't use that 1GB (no allocations). Once the first batch job is 
>>> submitted I can see the memory roughly go up by 1GB. I presume this is when 
>>> TM allocates its 1GB on the heap, and if I read correctly this is 
>>> essentially a large byte buffer that is tenured so that it is never GCed. 
>>> Flink writes any pojos (serializes) to this byte buffer and this is to 
>>> essentially circumvent GC for performance. Once the job is done, this byte 
>>> buffer remains on the heap, and the task manager keeps it to use for the 
>>> next batch job. This is why I never see the memory go down after a batch 
>>> job is complete. 
>>> 
>>> Does this make sense? Please let me know what you think.
>>> 
>>> Thanks
>>> 
>>> Tim
>>> 
>>> On Thu, Oct 10, 2019, 11:16 PM Xintong Song  wrote:
>>>> I think it depends on your configurations.
>>>> - Are you using on-heap/off-heap managed memory? (configured by 
>>>> 'taskmanager.memory.off-heap', by default is false)
>>>> - Is managed memory pre-allocated? (configured by 
>>>> 'taskmanager.memory.preallocate', by default is ffalse)

>>>> 

>>>> If managed memory is pre-allocated, then the allocated memory segments 
>>>> will never be released. If it's not pre-allocated, memory segments should 
>>>> be released when the task is finished, but the actual memory will not be 
>>>> de-allocated until next GC. Since the job is finished, there may not be 
>>>> enough heap activities to trigger the GC. If on-heap memory is used, you 
>>>> may not be able to observe the decreasing of TM memory usage, because JVM 
>>>> heap size does not scale down. Only if off-heap memory is used, you might 
>>>> be able to observe the decreasing of TM memory usage after a GC, but not 
>>>> from a jmap dump because jmap dumps heap memory usage only.

>>>> 

>>>> Besides, I don't think you need to worry about whether memory is released 
>>>> after one job is finished. Sometimes flink/jvm do not release memory after 
>>>> jobs/tasks finished, so that it can be reused directly by other 
>>>> jobs/tasks, for the purpose of reducing allocate/deallocated overhea