Hi kumar,

You can try Zeppelin which support the udf sharing across languages

http://zeppelin.apache.org/




rahul kumar <rk20.stor...@gmail.com> 于2021年9月27日周一 上午4:20写道:

> I'm trying to use a function defined in scala jar in pyspark ( spark
> 3.0.2).
>
> --scala ---
>
> Object PythonUtil {
>
> def customedf(dataFrame: DataFrame,
>                  keyCol: String,
>                  table: String,
>                  outputSchema: StructType,
>                  database: String): DataFrame = {
>
> // some transformation of dataframe and convert as per the output schema
> types and fields.
> ...
> resultDF
> }
>
> //In jupyter notebook
> schema creation:
> alias = StructType([StructField("first_name", StringType(),
> False),StructField("last_name", StringType(), False)])
> name = StructType([StructField("first_name", StringType(),
> False),StructField("aliases", ArrayType(alias), False)])
> street_adress = StructType([StructField("street_name", StringType(),
> False),StructField("apt_number", IntegerType(), False)])
> address = StructType([StructField("zip", LongType(),
> False),StructField("street", street_adress, False),StructField("city",
> StringType(), False)])
> workHistory = StructType([StructField("company_name", StringType(),
> False),StructField("company_address", address,
> False),StructField("worked_from", StringType(), False)])
>
> //passing this to scala function.
> outputschema= StructType([StructField("name", name,
> False),StructField("SSN", StringType(), False),StructField("home_address",
> ArrayType(address), False)])
>
> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
> customerIdsDF=spark.createDataFrame(ssns,["SSN"])
>
> scala2_object= sc._jvm.com.mytest.spark.PythonUtil
> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
> 'table', outputschema, 'test'), spark._wrapped)
>
> Then I get an error that AttributeError: 'StructField' object has no
> attribute '_get_object_id'
>
> full stacktrace
> ---------------------------------------------------------------------------
> AttributeError                            Traceback (most recent call last)
> <ipython-input-25-74a3b3e652e6> in <module>
>       4
>       5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
> ----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
> 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in __call__(self, *args)
>    1294
>    1295     def __call__(self, *args):
> -> 1296         args_command, temp_args = self._build_args(*args)
>    1297
>    1298         command = proto.CALL_COMMAND_NAME +\
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in _build_args(self, *args)
>    1258     def _build_args(self, *args):
>    1259         if self.converters is not None and len(self.converters) >
> 0:
> -> 1260             (new_args, temp_args) = self._get_args(args)
>    1261         else:
>    1262             new_args = args
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in _get_args(self, args)
>    1245                 for converter in self.gateway_client.converters:
>    1246                     if converter.can_convert(arg):
> -> 1247                         temp_arg = converter.convert(arg,
> self.gateway_client)
>    1248                         temp_args.append(temp_arg)
>    1249                         new_args.append(temp_arg)
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
> in convert(self, object, gateway_client)
>     509         java_list = ArrayList()
>     510         for element in object:
> --> 511             java_list.add(element)
>     512         return java_list
>     513
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in __call__(self, *args)
>    1294
>    1295     def __call__(self, *args):
> -> 1296         args_command, temp_args = self._build_args(*args)
>    1297
>    1298         command = proto.CALL_COMMAND_NAME +\
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in _build_args(self, *args)
>    1264
>    1265         args_command = "".join(
> -> 1266             [get_command_part(arg, self.pool) for arg in new_args])
>    1267
>    1268         return args_command, temp_args
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> in <listcomp>(.0)
>    1264
>    1265         args_command = "".join(
> -> 1266             [get_command_part(arg, self.pool) for arg in new_args])
>    1267
>    1268         return args_command, temp_args
>
> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py
> in get_command_part(parameter, python_proxy_pool)
>     296             command_part += ";" + interface
>     297     else:
> --> 298         command_part = REFERENCE_TYPE + parameter._get_object_id()
>     299
>     300     command_part += "\n"
>
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Best Regards

Jeff Zhang

Reply via email to