Hi everyone,
I'm using PyFlink to communicate with HDFS and trying to set the checkpoint 
storage to an HDFS path. However, I encountered the following error:
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem 
for scheme "hdfs"
I'm using Flink 1.17.2 and Hadoop 3.2.1
Here's my Python code(which workd fine in Java):
from pyflink.common import Configuration, SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import CheckpointingMode, StreamExecutionEnvironment, 
CheckpointStorage, \ ExternalizedCheckpointCleanup from 
pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer 
if __name__ == '__main__': # import os # # os.environ["HADOOP_USER_NAME"] = 
"hadoop" conf = Configuration() conf.set_integer("rest.port", 10010) env = 
StreamExecutionEnvironment.get_execution_environment(conf) 
env.set_parallelism(1) env.enable_checkpointing(5000, 
CheckpointingMode.EXACTLY_ONCE) env.add_jars( 
f"file:///D:/codes/cpo-py/libs/flink-sql-connector-kafka-1.17.2.jar", 
f"file:///D:/codes/cpo-py/libs/hadoop-client-3.2.1.jar", 
f"file:///D:/codes/cpo-py/libs/hadoop-common-3.2.1.jar", 
f"file:///D:/codes/cpo-py/libs/hadoop-hdfs-3.2.1.jar", ) checkpoint_config = 
env.get_checkpoint_config() 
checkpoint_config.set_checkpoint_storage(CheckpointStorage("hdfs://node1:8020/cpo/stream/cpo_dc"))
 checkpoint_config.set_max_concurrent_checkpoints(1) 
checkpoint_config.set_min_pause_between_checkpoints(5000) 
checkpoint_config.set_checkpoint_timeout(10000) 
checkpoint_config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
 kafka_source = KafkaSource.builder() \ .set_bootstrap_servers("node1:9092") \ 
.set_topics("cpo_dc") \ .set_group_id("cpo_dc") \ 
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \ 
.set_value_only_deserializer(SimpleStringSchema()) \ .build() source = 
env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), 
"kafka_source") source.print()
env.execute()
The equivalent Java code runs without issues:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import 
org.apache.flink.configuration.Configuration; import 
org.apache.flink.connector.kafka.source.KafkaSource; import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.streaming.api.CheckpointingMode; import 
org.apache.flink.streaming.api.datastream.DataStreamSource; import 
org.apache.flink.streaming.api.environment.CheckpointConfig; import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public 
class Main { public static void main(String[] args) throws Exception { 
//System.setProperty("HADOOP_USER_NAME", "hadoop"); Configuration conf = new 
Configuration(); conf.setInteger("rest.port", 10010); 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(conf) .setParallelism(1) 
.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); 
env.getCheckpointConfig().setCheckpointStorage("hdfs://node1:8020/cpo/stream/cpo_dc");
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); 
env.getCheckpointConfig().setCheckpointTimeout(10000); 
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() 
.setBootstrapServers("node1:9092") .setGroupId("cpo_dc") .setTopics("cpo_dc") 
.setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new 
SimpleStringSchema()) .build(); DataStreamSource<String> source = 
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source"); 
source.print(); env.execute(); }
}
Is there something I need to configure differently in PyFlink to enable HDFS 
checkpoint storage?
Any help or guidance would be greatly appreciated!
Best regards!

回复