Hi,
I am using protable runner (flink) with python SDK.  I am on latest version
of Beam (0.41).
The job is running on kubernetes. I launched the job manager with sidecar
container (using image: apache/beam_flink1.14_job_server:2.41.0) to start
the expansion service with following command:
```
java
-cp /opt/apache/beam/jars/
org.apache.beam.sdk.expansion.service.ExpansionService
8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=EXTERNAL
--defaultEnvironmentConfig=localhost:8097
```
In the code I am doing:
```
ReadFromKafka(
consumer_config={
"bootstrap.servers": 'BROKER',
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.jaas.config":
f'org.apache.kafka.common.security.scram.ScramLoginModule required
username="{sasl_username}" password="{sasl_password}";',
},
topics=[self.options.topic],
with_metadata=False,
expansion_service="localhost:8097"
)
```
But it shows with error:
```
2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/KafkaIO
.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1) (da27593d9232b9781fa1db3fd49d228e)
switched from INITIALIZING to FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-
69-250.ec2.internal (dataPort=43553).
org.apache.flink.util.SerializedThrowable:
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1.
BeamFnExternalWorkerPool/StartWorker
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
LocalCache$Segment.get(LocalCache.java:2050) ~[?:?]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache
.get(LocalCache.java:3952) ~[?:?]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache
.getOrLoad(LocalCache.java:3974) ~[?:?]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?]
at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
DefaultJobBundleFactory.java:451) ~[?:?]
at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
DefaultJobBundleFactory.java:436) ~[?:?]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
.forStage(DefaultJobBundleFactory.java:303) ~[?:?]
at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext
.getStageBundleFactory(DefaultExecutableStageContext.java:38) ~[?:?]
at org.apache.beam.runners.fnexecution.control.
ReferenceCountingExecutableStageContextFactory$WrappedContext
.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:
202) ~[?:?]
at org.apache.beam.runners.flink.translation.wrappers.streaming.
ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
~[?:?]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.
StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
.java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
``` Does anyone know how I could fix this issue? Thanks!

Sincerely,
Lydian Lee

Reply via email to