Yes, the logs is on TaskManager.  The command line options is:
```
--streaming
--runner=portableRunner
--environment_type=PROCESS
--environment_config={"command": "/opt/apache/beam/boot"}
```
Note that I actually copied both python and java in the same docker image
for both job and task manager in flink.

```
COPY --from=apache/beam_python3.7_sdk:2.41.0 /opt/apache/beam/
/opt/apache/beam/
COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/
/opt/apache/beam_java/
RUN mv /opt/apache/beam_java/boot /opt/apache/beam/java_boot && cp -r
/opt/apache/beam_java/* /opt/apache/beam/
```
Therefore the actual line to start the expansion service is:
```
java -cp /opt/apache/beam/jars/*
org.apache.beam.sdk.expansion.service.ExpansionService 8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=PROCESS
--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/java_boot\"}
```

I am not familiar with Java, would you mind recommend me on how to pack all
the dependencies (or how do I find what are the dependencies) in a single
jar? Thanks!


Sincerely,
Lydian Lee



On Wed, Sep 21, 2022 at 1:11 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Lydian,
>
> I'm not sure about this one. Can you please clarify - the logs are logs of
> TaskManager, right? Can you please share the command line options you pass
> to the Pipeline?
>
> I'm not sure why the SDK harness would need the job server, that seems
> strange. Can you let the python x-lang transform start its own expansion
> service using the default expansion service? That would mean you have to:
>
>  a) pack the expansion service with dependencies into single jar (shadow
> jar)
>
>  b) create something like get_expansion_service function in [1]
>
>  c) passing this expansion service to the ReadFromKafka(...,
> expansion_service=get_expansion_service())
>
> That should start the expansion service locally where you run your python
> main method and then submit the job.
>
>   Jan
>
> [1]
> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
> On 9/20/22 23:26, Lydian wrote:
>
> Thanks Jan! I am now building my taskmanager with
> ```
> COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/
> /opt/apache/beam/
> ```
> and start the expansion service with:
> ```
> java -cp /opt/apache/beam/jars/*
> org.apache.beam.sdk.expansion.service.ExpansionService 8097
> --javaClassLookupAllowlistFile=*
> --defaultEnvironmentType=PROCESS
> --defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}
> ```
> But now I got another error which seems related to staging artifact:
> ```
> 2022/09/20 20:56:35 Initializing java harness: /opt/apache/beam/java_boot
> --id=1-1 --provision_endpoint=localhost:33487
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru
> nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess
> -oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F
> kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn
> -y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar
> 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat
> a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar
> 2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem
>                 [] - opening file
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed
> ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar
> java.io.FileNotFoundException:
> /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
> (No such fil
> e or directory)
>         at java.io.FileInputStream.open0(Native Method)
>         at java.io.FileInputStream.open(FileInputStream.java:195)
>         at java.io.FileInputStream.<init>(FileInputStream.java:138)
>         at
> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)
>         at
> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
>         at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
>         at
> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)
>         at
> org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
>         at
> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>         at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> ```
>
> Does anyone know which things I would also need to configure?  Thanks
>
>
> Sincerely,
> Lydian Lee
>
>
>
> On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Lydian,
>>
>> there are two parts involved.
>>
>>  a) expansion service (which you run on port 8097) - this service expands
>> the ReadFromKafka which is Java transform
>>
>>  b) Java SDK environment, which is not the expansion service, it must be
>> some environment that is able to run the Java ReadFromKafka transform. In
>> flink, you can use PROCESS environment type (e.g. [1]), but there might be
>> other options (e.g. DOCKER), see [2]
>>
>> Hope this helps,
>>
>>  Jan
>>
>> [1]
>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
>>
>> [2] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>> On 9/20/22 10:45, Lydian wrote:
>>
>> Hi,
>> I am using protable runner (flink) with python SDK.  I am on latest
>> version of Beam (0.41).
>> The job is running on kubernetes. I launched the job manager with sidecar
>> container (using image: apache/beam_flink1.14_job_server:2.41.0) to start
>> the expansion service with following command:
>> ```
>> java
>> -cp /opt/apache/beam/jars/
>> org.apache.beam.sdk.expansion.service.ExpansionService
>> 8097
>> --javaClassLookupAllowlistFile=*
>> --defaultEnvironmentType=EXTERNAL
>> --defaultEnvironmentConfig=localhost:8097
>> ```
>> In the code I am doing:
>> ```
>> ReadFromKafka(
>> consumer_config={
>> "bootstrap.servers": 'BROKER',
>> "security.protocol": "SASL_SSL",
>> "sasl.mechanism": "SCRAM-SHA-512",
>> "sasl.jaas.config":
>> f'org.apache.kafka.common.security.scram.ScramLoginModule required
>> username="{sasl_username}" password="{sasl_password}";',
>> },
>> topics=[self.options.topic],
>> with_metadata=False,
>> expansion_service="localhost:8097"
>> )
>> ```
>> But it shows with error:
>> ```
>> 2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph.
>> ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/
>> KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(
>> GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)
>> (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to FAILED
>> on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal (dataPort=43553
>> ).
>> org.apache.flink.util.SerializedThrowable:
>> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
>> UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1.
>> BeamFnExternalWorkerPool/StartWorker
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache$Segment.get(LocalCache.java:2050) ~[?:?]
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache.get(LocalCache.java:3952) ~[?:?]
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache.getOrLoad(LocalCache.java:3974) ~[?:?]
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?]
>> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
>> LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.
>> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
>> DefaultJobBundleFactory.java:451) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.
>> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
>> DefaultJobBundleFactory.java:436) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
>> .forStage(DefaultJobBundleFactory.java:303) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.
>> DefaultExecutableStageContext.getStageBundleFactory(
>> DefaultExecutableStageContext.java:38) ~[?:?]
>> at org.apache.beam.runners.fnexecution.control.
>> ReferenceCountingExecutableStageContextFactory$WrappedContext
>> .getStageBundleFactory(ReferenceCountingExecutableStageContextFactory
>> .java:202) ~[?:?]
>> at org.apache.beam.runners.flink.translation.wrappers.streaming.
>> ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
>> ~[?:?]
>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
>> .initializeStateAndOpenOperators(RegularOperatorChain.java:110)
>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
>> StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.streaming.runtime.tasks.
>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
>> StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
>> StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
>> .java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
>> Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
>> 927) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>> ``` Does anyone know how I could fix this issue? Thanks!
>> Sincerely,
>> Lydian Lee
>>
>>

Reply via email to