Hi Sumeet,

The archive files will be uploaded to the blob server. This is the same no 
matter specifying the archives via command line option `—pyArchives` or via 
`add_python_archive`. 


> And when I try to programmatically do this by calling add_python_archive(), 
> the job gets submitted but fails because the target directory is not found on 
> the UDF node.

Could you share some code snippet, e.g. is this a Table API program or a 
DataStream API program? Besides, could you share the exception stack?

Regards,
Dian

> 2021年6月11日 下午7:25,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道:
> 
> I'm using a standalone deployment on Kubernetes for this use case. Does the 
> archive get uploaded to the cluster via the :8081 REST/WebUI port or via some 
> other port like 6123/RPC or 6124/BLOB-SERVER? I'm wondering if not exposing 
> those ports on the local machine might prevent the archive from getting 
> loaded? Although I would have expected an explicit error in that case.
> 
> NAMESPACE     NAME               TYPE           PORTS     
> flink         flink-jobmanager   ClusterIP      rpc:6123►0 blob-server:6124►0 
> webui:8081►0
> 
> Thanks,
> Sumeet
> 
> 
> On Fri, Jun 11, 2021 at 2:48 PM Roman Khachatryan <ro...@apache.org 
> <mailto:ro...@apache.org>> wrote:
> Hi Sumeet,
> 
> Probably there is an issue with uploading the archive while submitting the 
> job.
> The commands and API usage look good to me.
> Dian could you please confirm that?
> 
> Regards,
> Roman
> 
> On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
> <sumeet.malho...@gmail.com <mailto:sumeet.malho...@gmail.com>> wrote:
> >
> > Thank you Roman. Yes, that's what I am going to do.
> >
> > But I'm running into another issue... when I specify the --pyArchives 
> > option on the command line, the job never gets submitted and is stuck 
> > forever. And when I try to programmatically do this by calling 
> > add_python_archive(), the job gets submitted but fails because the target 
> > directory is not found on the UDF node. Flink is deployed on a K8S cluster 
> > in my case and the port 8081 is forwarded to the localhost.
> >
> > Here's the command line I use:
> >
> > ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python 
> > my_job.py  --pyArchives file:///path/to/schema.zip#schema
> >
> > And within the UDF I'm access the schema file as:
> >
> > read_schema('schema/my_schema.json')
> >
> > Or if I try using the API instead of the command-line, the call looks as:
> >
> > env = StreamExecutionEnvironment.get_execution_environment()
> > env.add_python_archive('schema.zip', 'schema')
> >
> > Initially, my_job.py itself had its own command line options, and I was 
> > thinking that might interfere with the overall Flink command line options, 
> > but even after removing that I'm not able to submit the job anymore. 
> > However, if I don't use the --pyArchives option and manually transfer the 
> > schema file to a location on the UDF node, the job gets submitted and works 
> > as expected.
> >
> > Any reason why this might happen?
> >
> > Thanks,
> > Sumeet
> >
> >
> > On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan <ro...@apache.org 
> > <mailto:ro...@apache.org>> wrote:
> >>
> >> Hi,
> >>
> >> I think the second option is what you need. The documentation says
> >> only zip format is supported.
> >> Alternatively, you could upload the files to S3 or other DFS and
> >> access from TMs and re-upload when needed.
> >>
> >> [1]
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
> >>  
> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives>
> >>
> >> Regards,
> >> Roman
> >>
> >> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
> >> <sumeet.malho...@gmail.com <mailto:sumeet.malho...@gmail.com>> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON 
> >> > schema files actually). The path of this file can be passed into the 
> >> > UDTF, but essentially this path needs to exist on the Task Manager node 
> >> > where the task executes. What's the best way to upload these resource 
> >> > files? As of now, my custom Flink image creates a fixed path with the 
> >> > required resource files, but I'd like it to be run time configurable.
> >> >
> >> > There are 2 APIs available to load files when submitting a PyFlink job...
> >> >
> >> > stream_execution_environment.add_python_file() - Recommended to upload 
> >> > files (.py etc) but doesn't let me configure the final path on the 
> >> > target node. The files are added to PYTHONPATH, but it needs the UDTF 
> >> > function to lookup for this file. I'd like to pass the file location 
> >> > into the UDTF instead.
> >> >
> >> > stream_execution_environment.add_python_archive() - Appears to be more 
> >> > generic, in the sense that it allows a target directory to be specified. 
> >> > The documentation doesn't say anything about the contents of the 
> >> > archive, so I'm guessing it could be any type of file. Is this what is 
> >> > needed for my use case?
> >> >
> >> > Or is there any other recommended way to upload non-Python 
> >> > dependencies/resources?
> >> >
> >> > Thanks in advance,
> >> > Sumeet

Reply via email to