Hi team,
Hope Everyone is doing good,

We have an issue regarding writing checkpoints metadata to S3 using pyflink
datastream api. we are using Apache-Flink==1.16.0. We are able to sink our
Stream into s3 but when it comes to writing checkpoint data. we are getting
the following error.
We tried a path with *s3:// s3a://, s3p:// prefixes *but it failed with all
of them.

*(Code is attached after the error message)*

Looking forward to hearing from you. Thank you

Traceback (most recent call last):
  File "app.py", line 103, in <module>
    real_time_data_analytics()
  File "app.py", line 98, in real_time_data_analytics
    env.execute('bot_detection_app_local2')
  File 
"/test/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py",
line 764, in execute
    return 
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/test/lib/python3.8/site-packages/py4j/java_gateway.py", line
1321, in __call__
    return_value = get_return_value(
  File "/test/lib/python3.8/site-packages/pyflink/util/exceptions.py",
line 146, in deco
    return f(*a, **kw)
  File "/test/lib/python3.8/site-packages/py4j/protocol.py", line 326,
in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o49.execute.
: org.apache.flink.util.FlinkException: Failed to execute job
'bot_detection_app_local2'.
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2075)
        at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not
start the JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
        at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
        at 
java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.client.JobInitializationException:
Could not start the JobMaster.
        at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkRuntimeException: Failed to create
checkpoint storage at checkpoint coordinator side.
        at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
        at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
        ... 3 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
create checkpoint storage at checkpoint coordinator side.
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:337)
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:245)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:511)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:317)
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
        at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 3 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3p'. The
scheme is directly supported by Flink through the following plugin(s):
flink-s3-fs-presto. Please ensure that each plugin resides within its
own subfolder within the plugins directory. See
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
for more information. If you want to use a Hadoop file system for that
scheme, please add the scheme to the configuration
fs.allowed-fallback-filesystems. For a full list of supported file
systems, please see
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
        at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:67)
        at 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage.createCheckpointStorage(FileSystemCheckpointStorage.java:323)
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:330)
        ... 18 more

import json
from pyflink.common import Row
from pyflink.common.serialization import SimpleStringSchema,
DeserializationSchema, Encoder
from pyflink.common.typeinfo import Types, BasicType, TypeInformation,
BasicTypeInfo
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.checkpoint_storage import
FileSystemCheckpointStorage, CheckpointStorage,
CustomCheckpointStorage
from pyflink.datastream.state_backend import FsStateBackend,
RocksDBStateBackend,EmbeddedRocksDBStateBackend
# import redis
from datetime import datetime

from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy
from pyflink.datastream.connectors.kinesis import
FlinkKinesisConsumer, KinesisDeserializationSchema


# from flatten_json import flatten_json


# this function reads data from data stream as json object one by one
and returns new data object after adding new field. and will be used
in real_time_data_analytics function

# redis_host = "redis"
# redis_port = 6379


def map_events(obj):
    print('*********************************************')
    columns = obj.split('\t')
    print(columns[0])
    print(obj)
    return json.dumps(obj)


# json_obj = json.loads(obj)
# return json.dumps(json_obj)


# def get_value_from_redis(key):
#     r = redis.StrictRedis(host=redis_host, port=redis_port,
decode_responses=True)
#     data = r.get(key)
#     json_obj = json.loads(data)
#     return json_obj


def real_time_data_analytics():
    # 1. create a StreamExecutionEnvironment
    # the sql connector for kafka is used here as it's a fat jar and
could avoid dependency issues
    # env.add_jars("file:///app/f2.jar")
    # 2. create source DataStream
    deserialization_schema = SimpleStringSchema()

    # Initializing Kafka consumer so that we can read the data sent by producer

    consumer_config = {
        'aws.region': 'us-central-1',
        'flink.stream.initpos': 'LATEST'
    }

    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///app/f3.jar")
    env.add_jars("file:///app/flink-s3-fs-hadoop-1.16.0.jar")
    env.add_jars("file:///app/flink-s3-fs-presto-1.16.0.jar")
    env.add_jars("file:///app/aws-java-sdk-core-1.12.347.jar")
    env.add_jars("file:///app/flink-statebackend-rocksdb-1.16.0.jar")
    env.add_jars("file:///app/hadoop-common-3.3.4.jar")
    print(env.__dict__)
    env.set_state_backend(EmbeddedRocksDBStateBackend())
    
env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("s3p://myBucket/bot_detection_test/checkpoints/"))

    env.enable_checkpointing(1000)
    kinesis = env.add_source(
        FlinkKinesisConsumer("mystream", SimpleStringSchema(), consumer_config))
    kinesis_filtered = kinesis.map(map_events, Types.STRING())

    sink = FileSink \
        .for_row_format("s3a://myBucket/bot_detection_test/",
Encoder.simple_string_encoder("UTF-8")) \
        
.with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024*1024
* 1,

rollover_interval=5 * 60 * 1000,

inactivity_interval=5 * 60 * 1000)) \
        .build()

    kinesis_filtered.sink_to(sink)
    # kinesis_filtered.print()
    env.execute('bot_detection_app_local2')


# running the above function in main
if __name__ == '__main__':
    real_time_data_analytics()


--

Regards,

*Mujahid Niaz*

*LinkedIn: *https://www.linkedin.com/in/mujahidniaz/
<https://www.linkedin.com/in/mujahid-niaz-30228673/>
*Portfolio: *http://mujahidniaz.github.io/
<https://www.linkedin.com/in/mujahid-niaz-30228673/>

Reply via email to