Hi kumar, You can try Zeppelin which support the udf sharing across languages
http://zeppelin.apache.org/ rahul kumar <rk20.stor...@gmail.com> 于2021年9月27日周一 上午4:20写道: > I'm trying to use a function defined in scala jar in pyspark ( spark > 3.0.2). > > --scala --- > > Object PythonUtil { > > def customedf(dataFrame: DataFrame, > keyCol: String, > table: String, > outputSchema: StructType, > database: String): DataFrame = { > > // some transformation of dataframe and convert as per the output schema > types and fields. > ... > resultDF > } > > //In jupyter notebook > schema creation: > alias = StructType([StructField("first_name", StringType(), > False),StructField("last_name", StringType(), False)]) > name = StructType([StructField("first_name", StringType(), > False),StructField("aliases", ArrayType(alias), False)]) > street_adress = StructType([StructField("street_name", StringType(), > False),StructField("apt_number", IntegerType(), False)]) > address = StructType([StructField("zip", LongType(), > False),StructField("street", street_adress, False),StructField("city", > StringType(), False)]) > workHistory = StructType([StructField("company_name", StringType(), > False),StructField("company_address", address, > False),StructField("worked_from", StringType(), False)]) > > //passing this to scala function. > outputschema= StructType([StructField("name", name, > False),StructField("SSN", StringType(), False),StructField("home_address", > ArrayType(address), False)]) > > ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], > ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] > customerIdsDF=spark.createDataFrame(ssns,["SSN"]) > > scala2_object= sc._jvm.com.mytest.spark.PythonUtil > pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', > 'table', outputschema, 'test'), spark._wrapped) > > Then I get an error that AttributeError: 'StructField' object has no > attribute '_get_object_id' > > full stacktrace > --------------------------------------------------------------------------- > AttributeError Traceback (most recent call last) > <ipython-input-25-74a3b3e652e6> in <module> > 4 > 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil > ----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, > 'SSN', 'table',smallSchema, 'test'), spark._wrapped) > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 1294 > 1295 def __call__(self, *args): > -> 1296 args_command, temp_args = self._build_args(*args) > 1297 > 1298 command = proto.CALL_COMMAND_NAME +\ > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in _build_args(self, *args) > 1258 def _build_args(self, *args): > 1259 if self.converters is not None and len(self.converters) > > 0: > -> 1260 (new_args, temp_args) = self._get_args(args) > 1261 else: > 1262 new_args = args > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in _get_args(self, args) > 1245 for converter in self.gateway_client.converters: > 1246 if converter.can_convert(arg): > -> 1247 temp_arg = converter.convert(arg, > self.gateway_client) > 1248 temp_args.append(temp_arg) > 1249 new_args.append(temp_arg) > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py > in convert(self, object, gateway_client) > 509 java_list = ArrayList() > 510 for element in object: > --> 511 java_list.add(element) > 512 return java_list > 513 > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in __call__(self, *args) > 1294 > 1295 def __call__(self, *args): > -> 1296 args_command, temp_args = self._build_args(*args) > 1297 > 1298 command = proto.CALL_COMMAND_NAME +\ > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in _build_args(self, *args) > 1264 > 1265 args_command = "".join( > -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) > 1267 > 1268 return args_command, temp_args > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in <listcomp>(.0) > 1264 > 1265 args_command = "".join( > -> 1266 [get_command_part(arg, self.pool) for arg in new_args]) > 1267 > 1268 return args_command, temp_args > > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py > in get_command_part(parameter, python_proxy_pool) > 296 command_part += ";" + interface > 297 else: > --> 298 command_part = REFERENCE_TYPE + parameter._get_object_id() > 299 > 300 command_part += "\n" > > > > > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards Jeff Zhang