Hi team, Hope Everyone is doing good, We have an issue regarding writing checkpoints metadata to S3 using pyflink datastream api. we are using Apache-Flink==1.16.0. We are able to sink our Stream into s3 but when it comes to writing checkpoint data. we are getting the following error. We tried a path with *s3:// s3a://, s3p:// prefixes *but it failed with all of them.
*(Code is attached after the error message)* Looking forward to hearing from you. Thank you Traceback (most recent call last): File "app.py", line 103, in <module> real_time_data_analytics() File "app.py", line 98, in real_time_data_analytics env.execute('bot_detection_app_local2') File "/test/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py", line 764, in execute return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) File "/test/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/test/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/test/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o49.execute. : org.apache.flink.util.FlinkException: Failed to execute job 'bot_detection_app_local2'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2075) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side. at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ... 3 more Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:337) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:245) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:511) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:317) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:67) at org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage.createCheckpointStorage(FileSystemCheckpointStorage.java:323) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:330) ... 18 more import json from pyflink.common import Row from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema, Encoder from pyflink.common.typeinfo import Types, BasicType, TypeInformation, BasicTypeInfo from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.checkpoint_storage import FileSystemCheckpointStorage, CheckpointStorage, CustomCheckpointStorage from pyflink.datastream.state_backend import FsStateBackend, RocksDBStateBackend,EmbeddedRocksDBStateBackend # import redis from datetime import datetime from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer, KinesisDeserializationSchema # from flatten_json import flatten_json # this function reads data from data stream as json object one by one and returns new data object after adding new field. and will be used in real_time_data_analytics function # redis_host = "redis" # redis_port = 6379 def map_events(obj): print('*********************************************') columns = obj.split('\t') print(columns[0]) print(obj) return json.dumps(obj) # json_obj = json.loads(obj) # return json.dumps(json_obj) # def get_value_from_redis(key): # r = redis.StrictRedis(host=redis_host, port=redis_port, decode_responses=True) # data = r.get(key) # json_obj = json.loads(data) # return json_obj def real_time_data_analytics(): # 1. create a StreamExecutionEnvironment # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues # env.add_jars("file:///app/f2.jar") # 2. create source DataStream deserialization_schema = SimpleStringSchema() # Initializing Kafka consumer so that we can read the data sent by producer consumer_config = { 'aws.region': 'us-central-1', 'flink.stream.initpos': 'LATEST' } env = StreamExecutionEnvironment.get_execution_environment() env.add_jars("file:///app/f3.jar") env.add_jars("file:///app/flink-s3-fs-hadoop-1.16.0.jar") env.add_jars("file:///app/flink-s3-fs-presto-1.16.0.jar") env.add_jars("file:///app/aws-java-sdk-core-1.12.347.jar") env.add_jars("file:///app/flink-statebackend-rocksdb-1.16.0.jar") env.add_jars("file:///app/hadoop-common-3.3.4.jar") print(env.__dict__) env.set_state_backend(EmbeddedRocksDBStateBackend()) env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("s3p://myBucket/bot_detection_test/checkpoints/")) env.enable_checkpointing(1000) kinesis = env.add_source( FlinkKinesisConsumer("mystream", SimpleStringSchema(), consumer_config)) kinesis_filtered = kinesis.map(map_events, Types.STRING()) sink = FileSink \ .for_row_format("s3a://myBucket/bot_detection_test/", Encoder.simple_string_encoder("UTF-8")) \ .with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024*1024 * 1, rollover_interval=5 * 60 * 1000, inactivity_interval=5 * 60 * 1000)) \ .build() kinesis_filtered.sink_to(sink) # kinesis_filtered.print() env.execute('bot_detection_app_local2') # running the above function in main if __name__ == '__main__': real_time_data_analytics() -- Regards, *Mujahid Niaz* *LinkedIn: *https://www.linkedin.com/in/mujahidniaz/ <https://www.linkedin.com/in/mujahid-niaz-30228673/> *Portfolio: *http://mujahidniaz.github.io/ <https://www.linkedin.com/in/mujahid-niaz-30228673/>