Yes, the logs is on TaskManager. The command line options is: ``` --streaming --runner=portableRunner --environment_type=PROCESS --environment_config={"command": "/opt/apache/beam/boot"} ``` Note that I actually copied both python and java in the same docker image for both job and task manager in flink.
``` COPY --from=apache/beam_python3.7_sdk:2.41.0 /opt/apache/beam/ /opt/apache/beam/ COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/ /opt/apache/beam_java/ RUN mv /opt/apache/beam_java/boot /opt/apache/beam/java_boot && cp -r /opt/apache/beam_java/* /opt/apache/beam/ ``` Therefore the actual line to start the expansion service is: ``` java -cp /opt/apache/beam/jars/* org.apache.beam.sdk.expansion.service.ExpansionService 8097 --javaClassLookupAllowlistFile=* --defaultEnvironmentType=PROCESS --defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/java_boot\"} ``` I am not familiar with Java, would you mind recommend me on how to pack all the dependencies (or how do I find what are the dependencies) in a single jar? Thanks! Sincerely, Lydian Lee On Wed, Sep 21, 2022 at 1:11 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi Lydian, > > I'm not sure about this one. Can you please clarify - the logs are logs of > TaskManager, right? Can you please share the command line options you pass > to the Pipeline? > > I'm not sure why the SDK harness would need the job server, that seems > strange. Can you let the python x-lang transform start its own expansion > service using the default expansion service? That would mean you have to: > > a) pack the expansion service with dependencies into single jar (shadow > jar) > > b) create something like get_expansion_service function in [1] > > c) passing this expansion service to the ReadFromKafka(..., > expansion_service=get_expansion_service()) > > That should start the expansion service locally where you run your python > main method and then submit the job. > > Jan > > [1] > https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py > On 9/20/22 23:26, Lydian wrote: > > Thanks Jan! I am now building my taskmanager with > ``` > COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/ > /opt/apache/beam/ > ``` > and start the expansion service with: > ``` > java -cp /opt/apache/beam/jars/* > org.apache.beam.sdk.expansion.service.ExpansionService 8097 > --javaClassLookupAllowlistFile=* > --defaultEnvironmentType=PROCESS > --defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"} > ``` > But now I got another error which seems related to staging artifact: > ``` > 2022/09/20 20:56:35 Initializing java harness: /opt/apache/beam/java_boot > --id=1-1 --provision_endpoint=localhost:33487 > 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem > [] - opening file > /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru > nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy > 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem > [] - opening file > /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess > -oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar > 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem > [] - opening file > /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F > kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar > 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem > [] - opening file > /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn > -y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar > 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem > [] - opening file > /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat > a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar > 2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem > [] - opening file > /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed > ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar > java.io.FileNotFoundException: > /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy > (No such fil > e or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.<init>(FileInputStream.java:138) > at > org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128) > at > org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84) > at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256) > at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124) > at > org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99) > at > org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > ``` > > Does anyone know which things I would also need to configure? Thanks > > > Sincerely, > Lydian Lee > > > > On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Lydian, >> >> there are two parts involved. >> >> a) expansion service (which you run on port 8097) - this service expands >> the ReadFromKafka which is Java transform >> >> b) Java SDK environment, which is not the expansion service, it must be >> some environment that is able to run the Java ReadFromKafka transform. In >> flink, you can use PROCESS environment type (e.g. [1]), but there might be >> other options (e.g. DOCKER), see [2] >> >> Hope this helps, >> >> Jan >> >> [1] >> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py >> >> [2] https://beam.apache.org/documentation/runtime/sdk-harness-config/ >> On 9/20/22 10:45, Lydian wrote: >> >> Hi, >> I am using protable runner (flink) with python SDK. I am on latest >> version of Beam (0.41). >> The job is running on kubernetes. I launched the job manager with sidecar >> container (using image: apache/beam_flink1.14_job_server:2.41.0) to start >> the expansion service with following command: >> ``` >> java >> -cp /opt/apache/beam/jars/ >> org.apache.beam.sdk.expansion.service.ExpansionService >> 8097 >> --javaClassLookupAllowlistFile=* >> --defaultEnvironmentType=EXTERNAL >> --defaultEnvironmentConfig=localhost:8097 >> ``` >> In the code I am doing: >> ``` >> ReadFromKafka( >> consumer_config={ >> "bootstrap.servers": 'BROKER', >> "security.protocol": "SASL_SSL", >> "sasl.mechanism": "SCRAM-SHA-512", >> "sasl.jaas.config": >> f'org.apache.kafka.common.security.scram.ScramLoginModule required >> username="{sasl_username}" password="{sasl_password}";', >> }, >> topics=[self.options.topic], >> with_metadata=False, >> expansion_service="localhost:8097" >> ) >> ``` >> But it shows with error: >> ``` >> 2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph. >> ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/ >> KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo( >> GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1) >> (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to FAILED >> on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal (dataPort=43553 >> ). >> org.apache.flink.util.SerializedThrowable: >> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: >> UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1. >> BeamFnExternalWorkerPool/StartWorker >> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache. >> LocalCache$Segment.get(LocalCache.java:2050) ~[?:?] >> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache. >> LocalCache.get(LocalCache.java:3952) ~[?:?] >> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache. >> LocalCache.getOrLoad(LocalCache.java:3974) ~[?:?] >> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache. >> LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?] >> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache. >> LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?] >> at org.apache.beam.runners.fnexecution.control. >> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>( >> DefaultJobBundleFactory.java:451) ~[?:?] >> at org.apache.beam.runners.fnexecution.control. >> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>( >> DefaultJobBundleFactory.java:436) ~[?:?] >> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory >> .forStage(DefaultJobBundleFactory.java:303) ~[?:?] >> at org.apache.beam.runners.fnexecution.control. >> DefaultExecutableStageContext.getStageBundleFactory( >> DefaultExecutableStageContext.java:38) ~[?:?] >> at org.apache.beam.runners.fnexecution.control. >> ReferenceCountingExecutableStageContextFactory$WrappedContext >> .getStageBundleFactory(ReferenceCountingExecutableStageContextFactory >> .java:202) ~[?:?] >> at org.apache.beam.runners.flink.translation.wrappers.streaming. >> ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249) >> ~[?:?] >> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain >> .initializeStateAndOpenOperators(RegularOperatorChain.java:110) >> ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( >> StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at org.apache.flink.streaming.runtime.tasks. >> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call( >> StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( >> StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask >> .java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( >> Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: >> 927) ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) >> ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >> ~[flink-dist_2.12-1.14.5.jar:1.14.5] >> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] >> ``` Does anyone know how I could fix this issue? Thanks! >> Sincerely, >> Lydian Lee >> >>