Re: Automatic deployment of new version of streaming stateful job
Hi Maxim You could write a script yourself which triggers the cancel with savepoint and then starts a new version using the savepoint that was created during the cancel. However, I’ve built a tool that allows you to perform these steps more easily: https://github.com/ing-bank/flink-deployer. The deployer will allow you to deploy or upgrade your jobs. All you need to do is integrate it into your CI/CD. Kind regards Marc On 16 Jul 2019, 02:46 +0200, Maxim Parkachov , wrote: > Hi, > > I'm trying to bring my first stateful streaming Flink job to production and > have trouble understanding how to integrate it with CI/CD pipeline. I can > cancel the job with savepoint, but in order to start new version of > application I need to specify savepoint path manually ? > > So, basically my question, what is best practice of automatically restarting > or deploying new version of stateful streaming application ? Every tip is > greatly appreciated. > > Thanks, > Maxim.
Re: Flink CLI
Hi Steven, Oytun You may find the tool we open-sourced last year useful. It offers deploying and updating jobs with savepointing. You can find it on Github: https://github.com/ing-bank/flink-deployer There’s also a docker image available in Docker Hub. Marc On 24 Apr 2019, 17:29 +0200, Oytun Tez , wrote: > Hi Steven, > > As much as I am aware, > 1) no update call. our build flow feels a little weird to us as well. > definitely requires scripting. > 2) we are using Flink management API remotely in our build flow to 1) get > jobs, 2) savepoint them, 3) cancel them etc. I am going to release a Python > script for this soon. > > --- > Oytun Tez > > M O T A W O R D > The World's Fastest Human Translation Platform. > oy...@motaword.com — www.motaword.com > > > > On Wed, Apr 24, 2019 at 11:06 AM Steven Nelson > > wrote: > > > Hello! > > > > > > I am working on automating our deployments to our Flink cluster. I had a > > > couple questions about the flink cli. > > > > > > 1) I thought there was an "update" command that would internally manage > > > the cancel with savepoint, upload new jar, restart from savepoint process. > > > > > > 2) Is there a way to get the Flink cli to output it's result in a json > > > format? Right now I would need to parse the results of the "flink list" > > > command to get the job id, cancel the job with savepoint, parse the > > > results of that to get the savepoint filename, then restore using that. > > > Parsing the output seems brittle to me. > > > > > > Thought? > > > -Steve > > >
Throttling/effective back-pressure on a Kafka sink
Hi We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 2 weeks. When doing a complete replay, it seems like Flink isn’t able to back-pressure or throttle the amount of messages going to Kafka, causing the following error: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 8396 record(s) for topic-1:12 ms has passed since batch creation We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka cluster is running version 2.1.1. The Kafka producer uses all default settings except from: compression.type = snappy max.in.flight.requests.per.connection = 1 acks = all client.dns.lookup = use_all_dns_ips I tried playing around with the buffer and batch settings, increasing timeouts, but none seem to be what we need. Increasing the delivery.timeout.ms and request.timeout.ms solves the initial error, but causes the Flink job to fail entirely due to: Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator Caused by: java.lang.RuntimeException: Buffer pool is destroyed. My assumption is that the Kafka producer will start blocking since it notices that it can't handle the batches, and Flink eventually runs out of buffers for the operator. What really baffles me is that the backpressure tab shows that everything is OK. The entire job pipeline (which reads from 4 different topics, unions them all and sinks towards 1 topic) pushes all the messages through to the sink stage, resulting in 18 million incoming stage messages, even though Kafka is in no way possible to keep up with this. I searched for others facing the same issue but can't find anything similar. I'm hoping that someone here could guide me in the right direction. Thanks in advance
Avro state migration using Scala in Flink 1.7.2 (and 1.8)
Hi I’ve been trying to get state migration with Avro working on Flink 1.7.2 using Scala case classes but I’m not getting anywhere closer to solving it. We’re using the most basic streaming WordCount example as a reference to test the schema evolution: val wordCountStream: DataStream[WordWithCount] = text .flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy(_.word) .reduce((a, b) => WordWithCount(a.word, a.count + b.count)) In this example, WordWithCount is our data object that we’d like to have serialized and deserialized with schema evolution support since keyBy maintains state. I understood from the documentation that it would only work for classes generated from Avro schema’s so I’ve tried using sbt-avrohugger to generate our case classes. However, for normal case classes generated by Avro we quickly ran into the problem that we needed a no-arg constructor. We looked at the flink-avro module and noticed that the classes generated by the avro-maven-plugin were implementing SpecificRecord and seemed to comply with the POJO rules as described in the Flink documentation. After switching from normal to specific avro generation with sbt-avrohugger, we ended up with Scala case classes that should comply with all rules. An example of such a generated case class is as follows: /** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */ import scala.annotation.switch case class WordWithCount(var word: String, var count: Long) extends org.apache.avro.specific.SpecificRecordBase { def this() = this("", 0L) def get(field$: Int): AnyRef = { (field$: @switch) match { case 0 => { word }.asInstanceOf[AnyRef] case 1 => { count }.asInstanceOf[AnyRef] case _ => new org.apache.avro.AvroRuntimeException("Bad index") } } def put(field$: Int, value: Any): Unit = { (field$: @switch) match { case 0 => this.word = { value.toString }.asInstanceOf[String] case 1 => this.count = { value }.asInstanceOf[Long] case _ => new org.apache.avro.AvroRuntimeException("Bad index") } () } def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$ } object WordWithCount { val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordWithCount\",\"fields\":[{\"name\":\"word\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"long\"}]}") } This, however, also didn’t work out of the box. We then tried to define our own type information using flink-avro’s AvroTypeInfo but this fails because Avro looks for a SCHEMA$ property (SpecificData:285) in the class and is unable to use Java reflection to identify the SCHEMA$ in the Scala companion object. implicit val wordWithCountInfo: AvroTypeInfo[WordWithCount] = new AvroTypeInfo(classOf[WordWithCount]) We then read in the 1.7 documentation that Flink doesn’t natively support POJO types, but only state defined by descriptors, like f.e. the ListStateDescriptor, and only if you allow Flink to infer the type information. This is definitely what we need for our processors that have map and list state. However, for the simple word count example, we should only need native POJO (de)serialization with state migration. We then noticed Github PR #7759 that adds support for POJO state schema evolution/migration. We wanted to give this a try and built flink from source from the release-1.8 branch. We then included the 1.8-SNAPSHOT jars in our job and got a local 1.8 cluster and job running fine. However, if we do not specify our own type information, and perform the following steps: 1. Run the job 2. Create a savepoint and stop the job 3. Update the WordWithCount avro schema to include a third field 4. Update the job according to the generated case class 5. Run the new job from the savepoint We are then faced with the following error: Caused by: java.lang.IllegalArgumentException: array is not of length 3 thrown from ScalaCaseClassSerializer.scala:50 However, if we again try to define our own type information using the AvroTypeInfo class, we are faced with the same issue as in 1.7. What are we missing? The documentation on how to use this is very limited, and we’re getting the idea that it may work with Java types, but maybe not with Scala case classes. I’d love to hear some pointers on how to approach this? Compared to our solution in 1.4 (https://medium.com/wbaa/evolve-your-data-model-in-flinks-state-using-avro-f26982afa399), we hoped to get rid of all the custom serializers by moving to 1.7 Thanks in advance! Marc