Hi everyone,
 
 
I set up the Flink statefun runtime on minikube (cpus=4, memory=10240) following the tutorial in statefun-playground https://github.com/apache/flink-statefun-playground/tree/main/deployments/k8s .
I developed my own statefun-Functions in java and deployed them the same way as shown in the tutorial.
 
Problem:
When receiving kafka-events at a very low rate everything works fine. But when the incoming rate is very high then the complete statefun-master crashes. In the logs of the pod in which my functions run I cannot find any error message or exception. In the log of the statefun master I just could find the following DEBUG statements for every function, but no errors.
 
```
INFO  org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction [] - Bootstrapping function FunctionType(pred, source). Blocking processing until first request is completed. Successive requests will be performed asynchronously.
```
 
Steps taken:
I tried a lot of settings in the statefun-manifest( increasing parallellism, more memory, ..), but nothing helped. I also tried some additional settings for my Undertow Server, but that did not help either. For debugging purposes I just tried to use one function, then everything worked well. But when having all my functions I get the problems again. I have also check in the Web UI and the ingress has no backpressure and the union.> function is only 30% busy. Shortly before crashing the union-> functions has received all messages, which are sent from the ingress.
 
 
Question:
My questions is I anyone of you has an idea what the problem could be?
 
 
I would be really grateful for your help and advice. Thank you very much.
 
 
Best regards,
Oliver
 
My statefun mainfest:
 
```
---
apiVersion: v1
kind: ConfigMap
metadata:
  namespace: statefun
  name: flink-config
  labels:
    app: statefun
    release: prometheus
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: statefun-master
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
    state.backend: rocksdb
    state.backend.rocksdb.timer-service.factory: ROCKSDB
    state.backend.incremental: true
    parallelism.default: 1
    s3.access-key: minioadmin
    s3.secret-key: minioadmin
    state.checkpoints.dir: s3://checkpoints/subscriptions
    s3.endpoint: http://minio.statefun.svc.cluster.local:9000
    s3.path-style-access: true
    jobmanager.memory.process.size: 2048m
    taskmanager.memory.process.size: 2048m
    #Added
    metrics.reporters: prom
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: 9999
  log4j-console.properties: |+
          monitorInterval=30
          rootLogger.level = DEBUG
          rootLogger.appenderRef.console.ref = ConsoleAppender
          logger.akka.name = akka
          logger.akka.level = INFO
          logger.kafka.name= org.apache.kafka
          logger.kafka.level = INFO
          logger.hadoop.name = org.apache.hadoop
          logger.hadoop.level = INFO
          logger.zookeeper.name = org.apache.zookeeper
          logger.zookeeper.level = INFO
          appender.console.name = ConsoleAppender
          appender.console.type = CONSOLE
          appender.console.layout.type = PatternLayout
          appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
          logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
          #Change from OFF to ON
          logger.netty.level = ON
---
apiVersion: v1
kind: Service
metadata:
  name: statefun-master-rest
  namespace: statefun
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9999"
spec:
  type: NodePort
  ports:
    - name: rest
      port: 8081
      targetPort: 8081
    - name: metrics
      port: 9999
  selector:
    app: statefun
    component: master
---
apiVersion: v1
kind: Service
metadata:
  name: statefun-master
  namespace: statefun
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9999"
spec:
  type: ClusterIP
  ports:
    - name: rpc
      port: 6123
    - name: blob
      port: 6124
    - name: ui
      port: 8081
    - name: metrics
      port: 9999
  selector:
    app: statefun
    component: master
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-master
  namespace: statefun
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9999"
spec:
  replicas: 1
  selector:
    matchLabels:
      app: statefun
      component: master
  template:
    metadata:
      labels:
        app: statefun
        component: master
    spec:
      containers:
        - name: master
          image: apache/flink-statefun
          imagePullPolicy: IfNotPresent
          env:
            - name: ROLE
              value: master
            - name: MASTER_HOST
              value: statefun-master
          resources:
              requests:
                #Changed from 0.5Gi to 1Gi to 2048m
                memory: "7Gi"
                cpu: "1"
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob
            - containerPort: 8081
              name: ui
            - containerPort: 9999
              name: metrics
              protocol: TCP
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: module-config-volume
              mountPath: /opt/statefun/modules/pred
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: module-config-volume
          configMap:
            name: module-config
            items:
              - key: module.yaml
                path: module.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: statefun-worker
  namespace: statefun
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "9999"
spec:
  replicas: 1
  selector:
    matchLabels:
      app: statefun
      component: worker
  template:
    metadata:
      labels:
        app: statefun
        component: worker
    spec:
      containers:
        - name: worker
          image: apache/flink-statefun
          imagePullPolicy: IfNotPresent
          env:
            - name: ROLE
              value: worker
            - name: MASTER_HOST
              value: statefun-master
          resources:
            requests:
              #Changed from 0.5Gi to 1Gi to 2048m
              memory: "7Gi"
              cpu: "1"
          ports:
            - containerPort: 6122
              name: rpc
            - containerPort: 6124
              name: blob
            - containerPort: 8081
              name: ui
            - containerPort: 9999
              name: metrics
              protocol: TCP
          livenessProbe:
            tcpSocket:
              port: 6122
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: module-config-volume
              mountPath: /opt/statefun/modules/pred
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: module-config-volume
          configMap:
            name: module-config
            items:
              - key: module.yaml
                path: module.yaml
```
 
 
My Undertow Server:
 
```
    public static void main(String[] args) throws IOException {
        final StatefulFunctions functions = new StatefulFunctions();
        functions.withStatefulFunction(SourceFn.SPEC);
        ....
      

        final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
        final Undertow httpServer =
                Undertow.builder()
                        .addHttpListener(8000, "0.0.0.0")
                        .setHandler(new UndertowHttpHandler(requestReplyHandler))
                        .setWorkerThreads(100)
                        .setServerOption(UndertowOptions.NO_REQUEST_TIMEOUT, 100 * 1000)   // 60 seconds no request timeout
                        .setServerOption(UndertowOptions.ENABLE_HTTP2, true)
                        .build();
        try {
            httpServer.start();
        }
        catch (Exception e){
            System.out.println(e);
            throw new RuntimeException("Server start: "+e);
        }
    }
}
 
```
 
 

Reply via email to