Hi, I am using protable runner (flink) with python SDK. I am on latest version of Beam (0.41). The job is running on kubernetes. I launched the job manager with sidecar container (using image: apache/beam_flink1.14_job_server:2.41.0) to start the expansion service with following command: ``` java -cp /opt/apache/beam/jars/ org.apache.beam.sdk.expansion.service.ExpansionService 8097 --javaClassLookupAllowlistFile=* --defaultEnvironmentType=EXTERNAL --defaultEnvironmentConfig=localhost:8097 ``` In the code I am doing: ``` ReadFromKafka( consumer_config={ "bootstrap.servers": 'BROKER', "security.protocol": "SASL_SSL", "sasl.mechanism": "SCRAM-SHA-512", "sasl.jaas.config": f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{sasl_username}" password="{sasl_password}";', }, topics=[self.options.topic], with_metadata=False, expansion_service="localhost:8097" ) ``` But it shows with error: ``` 2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/KafkaIO .Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1) (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to FAILED on 10.0.69.250:35101-76f99c @ ip-10-0- 69-250.ec2.internal (dataPort=43553). org.apache.flink.util.SerializedThrowable: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1. BeamFnExternalWorkerPool/StartWorker at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache. LocalCache$Segment.get(LocalCache.java:2050) ~[?:?] at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache .get(LocalCache.java:3952) ~[?:?] at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache .getOrLoad(LocalCache.java:3974) ~[?:?] at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache. LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?] at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache. LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?] at org.apache.beam.runners.fnexecution.control. DefaultJobBundleFactory$SimpleStageBundleFactory.<init>( DefaultJobBundleFactory.java:451) ~[?:?] at org.apache.beam.runners.fnexecution.control. DefaultJobBundleFactory$SimpleStageBundleFactory.<init>( DefaultJobBundleFactory.java:436) ~[?:?] at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory .forStage(DefaultJobBundleFactory.java:303) ~[?:?] at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext .getStageBundleFactory(DefaultExecutableStageContext.java:38) ~[?:?] at org.apache.beam.runners.fnexecution.control. ReferenceCountingExecutableStageContextFactory$WrappedContext .getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java: 202) ~[?:?] at org.apache.beam.runners.flink.translation.wrappers.streaming. ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249) ~[?:?] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain .initializeStateAndOpenOperators(RegularOperatorChain.java:110) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks. StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call( StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask .java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] ``` Does anyone know how I could fix this issue? Thanks!
Sincerely, Lydian Lee