Re: Automatic deployment of new version of streaming stateful job

2019-07-16 Thread Marc Rooding
Hi Maxim

You could write a script yourself which triggers the cancel with savepoint and 
then starts a new version using the savepoint that was created during the 
cancel.

However, I’ve built a tool that allows you to perform these steps more easily: 
https://github.com/ing-bank/flink-deployer. The deployer will allow you to 
deploy or upgrade your jobs. All you need to do is integrate it into your CI/CD.

Kind regards

Marc
On 16 Jul 2019, 02:46 +0200, Maxim Parkachov , wrote:
> Hi,
>
> I'm trying to bring my first stateful streaming Flink job to production and 
> have trouble understanding how to integrate it with CI/CD pipeline. I can 
> cancel the job with savepoint, but in order to start new version of 
> application I need to specify savepoint path manually ?
>
> So, basically my question, what is best practice of automatically restarting 
> or deploying new version of stateful streaming application ? Every tip is 
> greatly appreciated.
>
> Thanks,
> Maxim.


Re: Flink CLI

2019-04-25 Thread Marc Rooding
Hi Steven, Oytun

You may find the tool we open-sourced last year useful. It offers deploying and 
updating jobs with savepointing.

You can find it on Github: https://github.com/ing-bank/flink-deployer

There’s also a docker image available in Docker Hub.

Marc
On 24 Apr 2019, 17:29 +0200, Oytun Tez , wrote:
> Hi Steven,
>
> As much as I am aware,
> 1) no update call. our build flow feels a little weird to us as well. 
> definitely requires scripting.
> 2) we are using Flink management API remotely in our build flow to 1) get 
> jobs, 2) savepoint them, 3) cancel them etc. I am going to release a Python 
> script for this soon.
>
> ---
> Oytun Tez
>
> M O T A W O R D
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> > On Wed, Apr 24, 2019 at 11:06 AM Steven Nelson  
> > wrote:
> > > Hello!
> > >
> > > I am working on automating our deployments to our Flink cluster. I had a 
> > > couple questions about the flink cli.
> > >
> > > 1) I thought there was an "update" command that would internally manage 
> > > the cancel with savepoint, upload new jar, restart from savepoint process.
> > >
> > > 2) Is there a way to get the Flink cli to output it's result in a json 
> > > format? Right now I would need to parse the results of the "flink list" 
> > > command to get the job id, cancel the job with savepoint, parse the 
> > > results of that to get the savepoint filename, then restore using that. 
> > > Parsing the output seems brittle to me.
> > >
> > > Thought?
> > > -Steve
> > >


Throttling/effective back-pressure on a Kafka sink

2019-03-28 Thread Marc Rooding
Hi

We’ve got a job producing to a Kafka sink. The Kafka topics have a retention of 
2 weeks. When doing a complete replay, it seems like Flink isn’t able to 
back-pressure or throttle the amount of messages going to Kafka, causing the 
following error:

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Expiring 8396 record(s) for topic-1:12 ms has passed since 
batch creation

We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka 
cluster is running version 2.1.1. The Kafka producer uses all default settings 
except from:

compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips

I tried playing around with the buffer and batch settings, increasing timeouts, 
but none seem to be what we need. Increasing the delivery.timeout.ms and 
request.timeout.ms solves the initial error, but causes the Flink job to fail 
entirely due to:

Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.

My assumption is that the Kafka producer will start blocking since it notices 
that it can't handle the batches, and Flink eventually runs out of buffers for 
the operator.

What really baffles me is that the backpressure tab shows that everything is 
OK. The entire job pipeline (which reads from 4 different topics, unions them 
all and sinks towards 1 topic) pushes all the messages through to the sink 
stage, resulting in 18 million incoming stage messages, even though Kafka is in 
no way possible to keep up with this.

I searched for others facing the same issue but can't find anything similar. 
I'm hoping that someone here could guide me in the right direction.

Thanks in advance



Avro state migration using Scala in Flink 1.7.2 (and 1.8)

2019-03-21 Thread Marc Rooding
Hi

I’ve been trying to get state migration with Avro working on Flink 1.7.2 using 
Scala case classes but I’m not getting anywhere closer to solving it.

We’re using the most basic streaming WordCount example as a reference to test 
the schema evolution:

val wordCountStream: DataStream[WordWithCount] = text
 .flatMap { w => w.split("\\s") }
 .map { w => WordWithCount(w, 1) }
 .keyBy(_.word)
 .reduce((a, b) => WordWithCount(a.word, a.count + b.count))

In this example, WordWithCount is our data object that we’d like to have 
serialized and deserialized with schema evolution support since keyBy maintains 
state.

I understood from the documentation that it would only work for classes 
generated from Avro schema’s so I’ve tried using sbt-avrohugger to generate our 
case classes. However, for normal case classes generated by Avro we quickly ran 
into the problem that we needed a no-arg constructor.

We looked at the flink-avro module and noticed that the classes generated by 
the avro-maven-plugin were implementing SpecificRecord and seemed to comply 
with the POJO rules as described in the Flink documentation. After switching 
from normal to specific avro generation with sbt-avrohugger, we ended up with 
Scala case classes that should comply with all rules.

An example of such a generated case class is as follows:

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
import scala.annotation.switch

case class WordWithCount(var word: String, var count: Long) extends 
org.apache.avro.specific.SpecificRecordBase {
def this() = this("", 0L)
def get(field$: Int): AnyRef = {
   (field$: @switch) match {
 case 0 => {
   word
 }.asInstanceOf[AnyRef]
 case 1 => {
   count
 }.asInstanceOf[AnyRef]
 case _ => new org.apache.avro.AvroRuntimeException("Bad index")
   }
 }
def put(field$: Int, value: Any): Unit = {
   (field$: @switch) match {
 case 0 => this.word = {
   value.toString
 }.asInstanceOf[String]
 case 1 => this.count = {
   value
 }.asInstanceOf[Long]
 case _ => new org.apache.avro.AvroRuntimeException("Bad index")
   }
   ()
 }
def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$
}

object WordWithCount {
val SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"WordWithCount\",\"fields\":[{\"name\":\"word\",\"type\":\"string\"},{\"name\":\"count\",\"type\":\"long\"}]}")
}

This, however, also didn’t work out of the box. We then tried to define our own 
type information using flink-avro’s AvroTypeInfo but this fails because Avro 
looks for a SCHEMA$ property (SpecificData:285) in the class and is unable to 
use Java reflection to identify the SCHEMA$ in the Scala companion object.
implicit val wordWithCountInfo: AvroTypeInfo[WordWithCount] = new 
AvroTypeInfo(classOf[WordWithCount])
We then read in the 1.7 documentation that Flink doesn’t natively support POJO 
types, but only state defined by descriptors, like f.e. the 
ListStateDescriptor, and only if you allow Flink to infer the type information. 
This is definitely what we need for our processors that have map and list 
state. However, for the simple word count example, we should only need native 
POJO (de)serialization with state migration.

We then noticed Github PR #7759 that adds support for POJO state schema 
evolution/migration. We wanted to give this a try and built flink from source 
from the release-1.8 branch. We then included the 1.8-SNAPSHOT jars in our job 
and got a local 1.8 cluster and job running fine.

However, if we do not specify our own type information, and perform the 
following steps:


1. Run the job
2. Create a savepoint and stop the job
3. Update the WordWithCount avro schema to include a third field
4. Update the job according to the generated case class
5. Run the new job from the savepoint


We are then faced with the following error:

Caused by: java.lang.IllegalArgumentException: array is not of length 3 thrown 
from ScalaCaseClassSerializer.scala:50

However, if we again try to define our own type information using the 
AvroTypeInfo class, we are faced with the same issue as in 1.7.

What are we missing? The documentation on how to use this is very limited, and 
we’re getting the idea that it may work with Java types, but maybe not with 
Scala case classes. I’d love to hear some pointers on how to approach this? 
Compared to our solution in 1.4 
(https://medium.com/wbaa/evolve-your-data-model-in-flinks-state-using-avro-f26982afa399),
 we hoped to get rid of all the custom serializers by moving to 1.7

Thanks in advance!

Marc