Re: flink sql job 提交流程问题
补充: 这个问题在ha的情况下非常突出,因为和hdfs的交互式线性的,当文件达到几百的时候,特别慢 Peihui He 于2021年8月15日周日 上午11:18写道: > Hi all: > > 在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题: > 1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的 > > 2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制,但是blob > server在moveTempFileToStore方法中采用了写锁的方式,当时并发失效。 > > 通过本地测试,简单的调整了代码,示例如下: > BlobServer: > [image: image.png] > > ClientUtils > [image: image.png] > 调整后通过zeppelin 提交job后,时间由之前的几分钟到现在几十秒,并且不会随着依赖的jar的增加而线性增长。 > > 现有如下疑问: > 1、blob server 中的锁的粒度是不是过大?当并行提交多个sql,实际上也只能一个一个的执行。 > 2、blob server 中moveTempFileToStore 的写锁是否真的必要呢? > > Best wishes. >
flink sql job 提交流程问题
Hi all: 在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题: 1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的 2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制,但是blob server在moveTempFileToStore方法中采用了写锁的方式,当时并发失效。 通过本地测试,简单的调整了代码,示例如下: BlobServer: [image: image.png] ClientUtils [image: image.png] 调整后通过zeppelin 提交job后,时间由之前的几分钟到现在几十秒,并且不会随着依赖的jar的增加而线性增长。 现有如下疑问: 1、blob server 中的锁的粒度是不是过大?当并行提交多个sql,实际上也只能一个一个的执行。 2、blob server 中moveTempFileToStore 的写锁是否真的必要呢? Best wishes.
Re: Re: [ANNOUNCE] RocksDB Version Upgrade and Performance
Hi, FYI, the performance regression after upgrading RocksDB was clearly visible in all of our RocksDB related benchmarks, like for example: http://codespeed.dak8s.net:8000/timeline/?ben=stateBackends.ROCKS=2 http://codespeed.dak8s.net:8000/timeline/?ben=stateBackends.ROCKS_INC=2 (and many more in the State Backends executable) It's 6% to 12% across the board. Best, Piotrek śr., 11 sie 2021 o 13:42 张蛟 napisał(a): > Hi, Nico and yun: >Thanks for your great work and detail description on rocksdb > version upgrade and performance. About 800 jobs are using rocksdb state > backend in our production environment, and we > plan to upgrade more aim to solve the gc problems caused by large > states.Because of non-restrict memory control on rocksdb, we have to spend > a lot of time to solve the problem of memory usage beyond the physical > memory.With the support of strict block cache, things will become much > easier. Also, delete range api is useful for us too, so we prefer to > upgrade the rocksdb to the new release version and +1(non-binding). best, > zlzhang0122 > > At 2021-08-05 01:50:07, "Yun Tang" wrote: > >Hi Yuval, > > > >Upgrading RocksDB version is a long story since Flink-1.10. > >When we first plan to introduce write buffer manager to help control the > memory usage of RocksDB, we actually wanted to bump up to RocksDB-5.18 from > current RocksDB-5.17. However, we found performance regression in our micro > benchmark on state operations [1] if bumped to RocksDB-5.18. We did not > figure the root cause at that time and decide to cherry pick the commits of > write buffer manager to our own FRocksDB [2]. And we finally released our > own frocksdbjni-5.17.2-artisans-2.0 at that time. > > > >As time goes no, more and more bugs or missed features have been reported > in the old RocksDB version. Such as: > > > > 1. Cannot support ARM platform [3] > > 2. Dose not have stable deleteRange API, which is useful for Flink > scale out [4] > > 3. Cannot support strict block cache [5] > > 4. Checkpoint might stuck if using UNIVERSVAL compaction strategy [6] > > 5. Uncontrolled log size make us disabled the RocksDB internal LOG [7] > > 6. RocksDB's optimizeForPointLookup option might cause data lost [8] > > 7. Current dummy entry used for memory control in RocksDB-5.17 is too > large, leading performance problem [9] > > 8. Cannot support alpine-based images. > > 9. ... > > > >Some of the bugs are walked around, and some are still open. > > > >And we decide to make some changes from Flink-1.12. First of all, we > reported the performance regression problem compared with RocksDB-5.18 and > RocksDB-5.17 to RocksDB community [10]. However, as RocksDB-5.x versions > are a bit older for the community, and RocksJava usage might not be the > core part for facebook guys, we did not get useful replies. Thus, we decide > to figure out the root cause of performance regression by ourself. > >Fortunately, we find the cause via binary search the commits among > RocksDB-5.18 and RocksDB-5.17, and updated in the original thread [10]. To > be short, the performance regression is due to different implementation of > `__thread` and `thread_local` in gcc and would have more impact on dynamic > loading [11], which is also what current RocksJava jar package does. With > my patch [12], the performance regression would disappear if comparing > RocksDB-5.18 with RocksDB-5.17. > > > >Unfortunately, RocksDB-5.18 still has many bugs and we want to bump to > RocksDB-6.x. However, another performance regression appeared even with my > patch [12]. With previous knowledge, we know that we must verify the built > .so files with our java-based benchmark instead of using RocksDB built-in > db-bench. I started to search the 1340+ commits from RocksDB-5.18 to > RocksDB-6.11 to find the performance problem. However, I did not figure out > the root cause after spending several weeks this time. The performance > behaves up and down in those commits and I cannot get the commit which lead > the performance regression. Take this commit of integrating block cache > tracer in block-based table reader [13] for example, I noticed that this > commit would cause a bit performance regression and that might be the > useless usage accounting in operations, however, the problematic code was > changed in later commits. Thus, after several weeks digging, I have to give > up for the endless searching in the thousand commits temporarily. As > RocksDB community seems not make the project management system public, > unlike Apache's open JIRA systems, we do not know what benchmark they > actually run before releasing each version to guarantee the performance. > > > >With my patch [10] on latest RocksDB-6.20.3, we could get the results on > nexmark in the original thread sent by Stephan, and we can see the > performance behaves closely in many real-world cases. And we also hope new > features, such as direct buffer supporting [14] in RocksJava
Exploring Flink for a HTTP delivery service.
Hi, Aim: Building an event delivery service Scale : Peak load 50k messages/sec. Average load 5k messages/sec Expected to grow every passing month Unique Customer Endpoints : 10k+ Unique events(kafka topics) : 500+ Unique tenants : 30k+ Subscription Level : Events are generated for tenants. Customers may subscribe a) entirely to an event or b) either at tenant level ( 5 tenants or 100 tenants) or c) even at sub-tenant level. ( Tenant 2. Dept 100,200,300) *Other Requirements *: 1) Batching events based on quantity or minimum threshold time whichever comes first . Example 1000 messages or 1 sec. 2) Message size < 1kb *Possible Solutions:* 1) Build an app using reactive programming say vert.x/spring reactive etc 2) Use apache flink *Flink Solution * RDS : Has the subscription connection details [image: Flink HTTP Publisher.png] 2a ) Use DB and Look UP Cache to retrieve Configurations (i) Stream the data from kafka (ii) For every message flowing in , query RDS(postgres) ,get the connection/subscription details, and apply filters. [Use lookup Cache to improve performance] (iii a) if it's a streaming customer , form the message with appropriate authentication details. (iii b) if it's a batch customer, push the message to the state backend. Once maximum message or minimum threshold batch time is reached, retrieve the messages and form a single batch message with appropriate authentication details. (iv) Send message and endpoint info to async sink. which delivers to customers. In case of failure write to a dead letter queue where customers can poll later., 2b ) Load Configurations to BroadCastState and Update it in a regular interval (i) Stream the data from kafka (iI) Periodically query the PROXY API (on top of RDS) to get the latest added/updated subscription/connection details . (iii) For every message flowing in from kafka , Check against the broadcasted configuration to find the customers subscribed for the event, their filter requirement and connection details. (iv a) if it's a streaming customer , form the message with appropriate authentication details. (v b) if it's a batch customer, push the message to the state backend. Once maximum message or minimum threshold batch time is reached, retrieve the messages and form a single batch message with appropriate authentication details. (vi) Send message and endpoint info to async sink. which delivers to customers. In case of failure write to a dead letter queue where customers can poll later. *Questions : * 1) Batching is an aggregation operation.But what I have seen in the examples of windowing is that they get the count/max/min operation in the particular window. So could the batching be implemented via a windowing mechanism ? 2) Is it a good design to have both batched delivery and per-event delivery in the same job or should it be different ? 2) Does the performance of broadcasted state better than LookUp Cache? (Personally i have implemented broadcasted state for other purpose and not sure about the performance of Querying DB+LookUpCache) 3) I read this " The working copy of broadcast state is always on the heap; not in RocksDB. So, it has to be small enough to fit in memory. Furthermore, each instance will copy all of the broadcast state into its checkpoints, so all checkpoints and savepoints will have *n* copies of the broadcast state (where *n* is the parallelism). If you are able to key partition this data, then you might not need to broadcast it. It sounds like it might be per-employee data that could be keyed by the employeeId. But if not, then you'll have to keep it small enough to fit into memory. " Using Keyed Process BroadCast looks Better than using non keyed as the same data is not replicated against all the parallel operators. A caveat here is that the load across all subscriptions are not the same . So if we key the stream , then we might have unbalanced job running. Thoughts on this ? 4) Latency must be minimal , so the first thought is to store the messages to be batched in HashMapStateBackend. But to store both the State of config and the data in HEAP might increase the memory usage a lot. Since there could be a huge spike in load.Are there any other things that need to be factored in ? 5) Auto Scaling capability would save a lot of cost because of consistent load patterns with occasional spikes. Though reactive scaling is introduced in flink 1.13 , we don't know whether its battle hardened . 6) After looking at the solutions , does flink seem to be a natural fit for this use case in comparison to Spring Reactor framework/vert.x ? One thing we see from the documentation is that spring reactive can auto scale very well but we need to work on fault tolerance/stability from the dev side which flink is good at. Spring reactor is new to us and we wanted to
Running Beam on a native Kubernetes Flink cluster
Hi! I need help implementing a native Kubernetes Flink cluster that needs to run batch jobs (run by TensorFlow Extended), but I am not sure I am configuring it right as I have issues running jobs on more than one task manager, while jobs run fine if there is only one TM. I use the following parameters for the job: "--runner=FlinkRunner", "--parallelism=4", f"--flink_master={flink_url}:8081", "--environment_type=EXTERNAL", f"--environment_config={beam_sdk_url}:5", "--flink_submit_uber_jar", "--worker_harness_container_image=none", I have configured the Beam workers to run as side-cars to the TM containers. I do this by configuring. task manager template for the pods like this: kubernetes.pod-template-file.taskmanager it is pointing out to a template file with contents: kind: Pod metadata: name: taskmanager-pod-template spec: #hostNetwork: true containers: - name: flink-main-container #image: apache/flink:scala_2.12 env: - name: AWS_REGION value: "eu-central-1" - name: S3_VERIFY_SSL value: "0" - name: PYTHONPATH value: "/data/flink/src" args: ["taskmanager"] ports: - containerPort: 6122 #22 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 #22 initialDelaySeconds: 30 periodSeconds: 60 - name: beam-worker-pool env: - name: PYTHONPATH value: "/data/flink/src" - name: AWS_REGION value: "eu-central-1" - name: S3_VERIFY_SSL value: "0" image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers imagePullPolicy: Always args: ["--worker_pool"] ports: - containerPort: 5 name: pool livenessProbe: tcpSocket: port: 5 initialDelaySeconds: 30 periodSeconds: 60 I have also created a kubernetes load balancer for the task managers, so clients can connect on port 5. So I use that address when configuring: f"--environment_config={beam_sdk_url}:5", the problem is as it looks like the Beam SDK harness on one task manager wants to connect to the endpoint running on the other task manager, but looks for it on localhost: Log from beam-worker-pool on TM 2: 2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial server at localhost:33705 caused by: context deadline exceeded The provision endpoint on TM 1 is the one actually listening on the port 33705, while this is looking for it on localhost, so cannot connect to it. Showing how I test this: ... TM 1: $ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool 2021/08/12 09:10:34 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=5 --container_executable=/opt/apache/beam/boot Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793'] 2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial server at localhost:40983 caused by: context deadline exceeded TM 2: = $ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool 2021/08/12 09:10:33 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=5 --container_executable=/opt/apache/beam/boot Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083'] 2021/08/12 09:13:09 Failed to obtain provisioning information: failed to dial server at localhost:32907 caused by: context deadline exceeded Testing: . TM 1: $ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983 curl: (7) Failed to connect to localhost port 40983: Connection refused root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907 Warning: Binary output can mess up your terminal. Use "--output -" to ... TM 2: = root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907 curl: (7) Failed to connect to localhost port 32907: Connection refused root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983 Warning: Binary output can mess up your terminal. Use "--output -" to tell Warning: curl to output it to your terminal anyway, or consider "--output Not sure how to fix this. Thanks, Gorjan
Problems with reading ORC files with S3 filesystem
Hi, I want to use Flink SQL filesystem to read ORC file via S3 filesystem on Flink 1.13. My table definition looks like this: create or replace table xxx (..., startdate string) partitioned by (startdate) with ('connector'='filesystem', 'format'='orc', 'path'='s3://xxx/orc/yyy') I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO as S3 provider and it works for Flinks checkpoints and HA files. The SQL connector also works when I use CSV or Avro formats. The problems start with ORC 1. If I just put flink-orc on job's classpath I get error on JobManager: Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121) ~[?:?] at org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88) ~[?:?] at org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) ~[flink-table-blink_2.12-1.13.2.jar:1.13.2] 2. I managed to put hadoop common libs on the classpath by this maven setup: org.apache.flink flink-orc_${scala.binary.version} ${flink.version} org.apache.orc orc-core org.apache.orc orc-core 1.5.6 org.apache.orc orc-shims 1.5.6 net.java.dev.jets3t jets3t 0.9.0 No the job is accepted by JobManager, but execution fails with lack of AWS credentials: Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy76.initialize(Unknown Source) at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287) at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395) at org.apache.orc.impl.ReaderImpl.(ReaderImpl.java:368) at org.apache.orc.OrcFile.createReader(OrcFile.java:343) I guess that ORC reader tries to recreate s3 filesystem in job's classloader and cannot use credentials from flink-conf.yaml. However I can see in the logs that it earlier managed to list the files on MinIO: 2021-08-14 09:35:48,285 INFO org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - Assigning remote split to requesting host '172': Optional[FileSourceSplit: s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc [0, 144607) hosts=[localhost] ID=02 position=null] So I think the issue is in ORCReader when it tries to read specific file. Any ideas hao can I modify the setup or pass the credentials to Jets3t? Regards, Piotr
Fwd: PyFlink performance and deployment issues
Hi all, I'm still dealing with the PyFlink deployment issue as described below. I see that I accidentally didn't forward it to the mailing list. Anyways, my job is stuck in `Initializing` and the logs don't really give me a clue what is going on. In my IDE it runs fine. The command I use to submit to the cluster: export PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python ./flink run \ --target remote \ -m localhost:8081 \ -pyarch venv.zip \ --pyExecutable venv.zip/venv/bin/python \ --parallelism 1 \ --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \ --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar I hope someone can help me with this because it is a blocker for me. Thanks in advance, Wouter -- Forwarded message - From: Wouter Zorgdrager Date: Thu, 8 Jul 2021 at 12:20 Subject: Re: PyFlink performance and deployment issues To: Xingbo Huang HI Xingbo, all, Regarding point 2, I actually made a mistake there. I picked port 8081 (WebUI port) rather than the job submission port (--target remote -m localhost:8081). For some reason, this does not give an error or warning and just starts a local cluster. However, now I got another issue: my job is stuck at initialization. Here an excerpt from the JM log: 2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0) with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING. 2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from SCHEDULED to DEPLOYING. 2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt #0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - _stream_key_by_map_operator (1/1) (ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> ( Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful- User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from DEPLOYING to INITIALIZING. I run with parallelism 1 and these are the latest loglines from the TM (there is no obvious error): 2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle size is configured to 5. 2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle time is configured to 1 milliseconds. 2021-07-08 12:12:18,791 WARN org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Error while loading kafka-version.properties: inStream parameter is null 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: unknown 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: unknown 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1625739138789 2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka. FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic client_reply 2021-07-08 12:12:18,815 INFO org.apache.flink.streaming.api.operators.