Hi everyone,
I set up the Flink statefun runtime on minikube (cpus=4, memory=10240) following the tutorial in statefun-playground https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s .
I developed my own statefun-Functions in java and deployed them the same way as shown in the tutorial.
Problem:
When receiving kafka-events at a very low rate everything works fine. But when the incoming rate is very high then the complete statefun-master crashes. In the logs of the pod in which my functions run I cannot find any error message or exception. In the log of the statefun master I just could find the following DEBUG statements for every function, but no errors.
```
INFO org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction [] - Bootstrapping function FunctionType(pred, source). Blocking processing until first request is completed. Successive requests will be performed asynchronously.
```
Steps taken:
I tried a lot of settings in the statefun-manifest( increasing parallellism, more memory, ..), but nothing helped. I also tried some additional settings for my Undertow Server, but that did not help either. For debugging purposes I just tried to use one function, then everything worked well. But when having all my functions I get the problems again. I have also check in the Web UI and the ingress has no backpressure and the union.> function is only 30% busy. Shortly before crashing the union-> functions has received all messages, which are sent from the ingress.
Question:
My questions is I anyone of you has an idea what the problem could be?
I would be really grateful for your help and advice. Thank you very much.
Best regards,
Oliver
My statefun mainfest:
```
---
apiVersion: v1
kind: ConfigMap
metadata:
namespace: statefun
name: flink-config
labels:
app: statefun
release: prometheus
data:
flink-conf.yaml: |+
jobmanager.rpc.address: statefun-master
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.incremental: true
parallelism.default: 1
s3.access-key: minioadmin
s3.secret-key: minioadmin
state.checkpoints.dir: s3://checkpoints/subscriptions
s3.endpoint: http://minio.statefun.svc.cluster.local:9000
s3.path-style-access: true
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
#Added
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999
apiVersion: v1
kind: ConfigMap
metadata:
namespace: statefun
name: flink-config
labels:
app: statefun
release: prometheus
data:
flink-conf.yaml: |+
jobmanager.rpc.address: statefun-master
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.incremental: true
parallelism.default: 1
s3.access-key: minioadmin
s3.secret-key: minioadmin
state.checkpoints.dir: s3://checkpoints/subscriptions
s3.endpoint: http://minio.statefun.svc.cluster.local:9000
s3.path-style-access: true
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
#Added
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999
log4j-console.properties: |+
monitorInterval=30
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
#Change from OFF to ON
logger.netty.level = ON
monitorInterval=30
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
#Change from OFF to ON
logger.netty.level = ON
---
apiVersion: v1
kind: Service
metadata:
name: statefun-master-rest
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: v1
kind: Service
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: master
template:
metadata:
labels:
app: statefun
component: master
spec:
containers:
- name: master
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: master
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
cpu: "1"
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/pred
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-worker
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: worker
template:
metadata:
labels:
app: statefun
component: worker
spec:
containers:
- name: worker
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: worker
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
cpu: "1"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/pred
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
apiVersion: v1
kind: Service
metadata:
name: statefun-master-rest
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: v1
kind: Service
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
- name: metrics
port: 9999
selector:
app: statefun
component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-master
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: master
template:
metadata:
labels:
app: statefun
component: master
spec:
containers:
- name: master
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: master
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
cpu: "1"
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/pred
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-worker
namespace: statefun
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9999"
spec:
replicas: 1
selector:
matchLabels:
app: statefun
component: worker
template:
metadata:
labels:
app: statefun
component: worker
spec:
containers:
- name: worker
image: apache/flink-statefun
imagePullPolicy: IfNotPresent
env:
- name: ROLE
value: worker
- name: MASTER_HOST
value: statefun-master
resources:
requests:
#Changed from 0.5Gi to 1Gi to 2048m
memory: "7Gi"
cpu: "1"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
- containerPort: 9999
name: metrics
protocol: TCP
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: module-config-volume
mountPath: /opt/statefun/modules/pred
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: module-config-volume
configMap:
name: module-config
items:
- key: module.yaml
path: module.yaml
```
My Undertow Server:
```
public static void main(String[] args) throws IOException {
final StatefulFunctions functions = new StatefulFunctions();
functions.withStatefulFunction(SourceFn.SPEC);
....
final StatefulFunctions functions = new StatefulFunctions();
functions.withStatefulFunction(SourceFn.SPEC);
....
final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
final Undertow httpServer =
Undertow.builder()
.addHttpListener(8000, "0.0.0.0")
.setHandler(new UndertowHttpHandler(requestReplyHandler))
.setWorkerThreads(100)
.setServerOption(UndertowOptions.NO_REQUEST_TIMEOUT, 100 * 1000) // 60 seconds no request timeout
.setServerOption(UndertowOptions.ENABLE_HTTP2, true)
.build();
try {
httpServer.start();
}
catch (Exception e){
System.out.println(e);
throw new RuntimeException("Server start: "+e);
}
}
}
```