Hi everyone, I'm using PyFlink to communicate with HDFS and trying to set the checkpoint storage to an HDFS path. However, I encountered the following error: Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "hdfs" I'm using Flink 1.17.2 and Hadoop 3.2.1 Here's my Python code(which workd fine in Java): from pyflink.common import Configuration, SimpleStringSchema, WatermarkStrategy from pyflink.datastream import CheckpointingMode, StreamExecutionEnvironment, CheckpointStorage, \ ExternalizedCheckpointCleanup from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer if __name__ == '__main__': # import os # # os.environ["HADOOP_USER_NAME"] = "hadoop" conf = Configuration() conf.set_integer("rest.port", 10010) env = StreamExecutionEnvironment.get_execution_environment(conf) env.set_parallelism(1) env.enable_checkpointing(5000, CheckpointingMode.EXACTLY_ONCE) env.add_jars( f"file:///D:/codes/cpo-py/libs/flink-sql-connector-kafka-1.17.2.jar", f"file:///D:/codes/cpo-py/libs/hadoop-client-3.2.1.jar", f"file:///D:/codes/cpo-py/libs/hadoop-common-3.2.1.jar", f"file:///D:/codes/cpo-py/libs/hadoop-hdfs-3.2.1.jar", ) checkpoint_config = env.get_checkpoint_config() checkpoint_config.set_checkpoint_storage(CheckpointStorage("hdfs://node1:8020/cpo/stream/cpo_dc")) checkpoint_config.set_max_concurrent_checkpoints(1) checkpoint_config.set_min_pause_between_checkpoints(5000) checkpoint_config.set_checkpoint_timeout(10000) checkpoint_config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) kafka_source = KafkaSource.builder() \ .set_bootstrap_servers("node1:9092") \ .set_topics("cpo_dc") \ .set_group_id("cpo_dc") \ .set_starting_offsets(KafkaOffsetsInitializer.latest()) \ .set_value_only_deserializer(SimpleStringSchema()) \ .build() source = env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "kafka_source") source.print() env.execute() The equivalent Java code runs without issues: import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Main { public static void main(String[] args) throws Exception { //System.setProperty("HADOOP_USER_NAME", "hadoop"); Configuration conf = new Configuration(); conf.setInteger("rest.port", 10010); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf) .setParallelism(1) .enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("hdfs://node1:8020/cpo/stream/cpo_dc"); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); env.getCheckpointConfig().setCheckpointTimeout(10000); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("node1:9092") .setGroupId("cpo_dc") .setTopics("cpo_dc") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source"); source.print(); env.execute(); } } Is there something I need to configure differently in PyFlink to enable HDFS checkpoint storage? Any help or guidance would be greatly appreciated! Best regards!