jaketf opened a new issue #11246:
URL: https://github.com/apache/airflow/issues/11246


   **Description**
   
   It appears that 
[`SparkSubmitHook`](https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/spark_submit_operator/index.html)'s
 
[implementation](https://github.com/apache/airflow/blob/master/airflow/providers/apache/spark/hooks/spark_submit.py)
 of kerberos support is orthogonal to the `airflow kerberos` support. It 
requires us to specify a keytab in the task which will not work for us because 
our worker container intentionally does not have a volume mount for the keytab. 
This means we will have to modify SparkSubmit[Hook/Operator] to [use ticket 
cache](https://spark.apache.org/docs/latest/security.html#using-a-ticket-cache).
   
   **Use case / motivation**
   
   I want to use spark submit operator natively w/ ccache. 
   
   **Proposed Changes **
   
   
   I think this should be an addition of "use_krb5ccache" variable to hook and 
operator that defaults to 'False' for backwards compatibility.
   When it is `True` we should add the following to the spark submit command 
construction:
   ```python3
   if self.use_krb5ccache:
       if not os.getenv('KRB5CCNAME'):
           raise AirflowException("KRB5CCNAME environment variable not set 
while trying to us ticket from ccache.")
       connection_cmd += [
            "--conf",
            "spark.kerberos.renewal.credentials=ccache"
        ]
   ```
   
   We should also fall back to use principal from security kerberos config if 
not specified in the task.
   ```
      self._principal = principal if principal else conf.get('kerberos', 
'principal')
   ```
   
   Note I've tested something similar via cluster policy as a workaround in my 
current project:
   ```python3
   def spark_tasks_use_ccache(task: BaseOperator):
       """Configure SparkSubmitOperator tasks to use kerberos ticket cache."""
       if isinstance(task, SparkSubmitOperator):
           # pylint: disable=protected-access
           if task._conf:  # noqa
               task._conf["spark.kerberos.renewal.credentials"] = "ccache"  # 
noqa
           task._principal = conf.get('kerberos', 'principal')
   ```
   
   
   cc: @mik-laj @potiuk WDYT about contributing this sort of thing back to 
airflow? should we change this in the hook? would it be interesting to 
contribute useful example cluster policies to airflow?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to