You can first try it via docker
http://zeppelin.apache.org/download.html#using-the-official-docker-image


Jeff Zhang <zjf...@gmail.com> 于2021年9月27日周一 上午6:49写道:

> Hi kumar,
>
> You can try Zeppelin which support the udf sharing across languages
>
> http://zeppelin.apache.org/
>
>
>
>
> rahul kumar <rk20.stor...@gmail.com> 于2021年9月27日周一 上午4:20写道:
>
>> I'm trying to use a function defined in scala jar in pyspark ( spark
>> 3.0.2).
>>
>> --scala ---
>>
>> Object PythonUtil {
>>
>> def customedf(dataFrame: DataFrame,
>>                  keyCol: String,
>>                  table: String,
>>                  outputSchema: StructType,
>>                  database: String): DataFrame = {
>>
>> // some transformation of dataframe and convert as per the output schema
>> types and fields.
>> ...
>> resultDF
>> }
>>
>> //In jupyter notebook
>> schema creation:
>> alias = StructType([StructField("first_name", StringType(),
>> False),StructField("last_name", StringType(), False)])
>> name = StructType([StructField("first_name", StringType(),
>> False),StructField("aliases", ArrayType(alias), False)])
>> street_adress = StructType([StructField("street_name", StringType(),
>> False),StructField("apt_number", IntegerType(), False)])
>> address = StructType([StructField("zip", LongType(),
>> False),StructField("street", street_adress, False),StructField("city",
>> StringType(), False)])
>> workHistory = StructType([StructField("company_name", StringType(),
>> False),StructField("company_address", address,
>> False),StructField("worked_from", StringType(), False)])
>>
>> //passing this to scala function.
>> outputschema= StructType([StructField("name", name,
>> False),StructField("SSN", StringType(), False),StructField("home_address",
>> ArrayType(address), False)])
>>
>> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
>> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
>> customerIdsDF=spark.createDataFrame(ssns,["SSN"])
>>
>> scala2_object= sc._jvm.com.mytest.spark.PythonUtil
>> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
>> 'table', outputschema, 'test'), spark._wrapped)
>>
>> Then I get an error that AttributeError: 'StructField' object has no
>> attribute '_get_object_id'
>>
>> full stacktrace
>>
>> ---------------------------------------------------------------------------
>> AttributeError                            Traceback (most recent call
>> last)
>> <ipython-input-25-74a3b3e652e6> in <module>
>>       4
>>       5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
>> ----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
>> 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>>    1294
>>    1295     def __call__(self, *args):
>> -> 1296         args_command, temp_args = self._build_args(*args)
>>    1297
>>    1298         command = proto.CALL_COMMAND_NAME +\
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _build_args(self, *args)
>>    1258     def _build_args(self, *args):
>>    1259         if self.converters is not None and len(self.converters) >
>> 0:
>> -> 1260             (new_args, temp_args) = self._get_args(args)
>>    1261         else:
>>    1262             new_args = args
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _get_args(self, args)
>>    1245                 for converter in self.gateway_client.converters:
>>    1246                     if converter.can_convert(arg):
>> -> 1247                         temp_arg = converter.convert(arg,
>> self.gateway_client)
>>    1248                         temp_args.append(temp_arg)
>>    1249                         new_args.append(temp_arg)
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
>> in convert(self, object, gateway_client)
>>     509         java_list = ArrayList()
>>     510         for element in object:
>> --> 511             java_list.add(element)
>>     512         return java_list
>>     513
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>>    1294
>>    1295     def __call__(self, *args):
>> -> 1296         args_command, temp_args = self._build_args(*args)
>>    1297
>>    1298         command = proto.CALL_COMMAND_NAME +\
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in _build_args(self, *args)
>>    1264
>>    1265         args_command = "".join(
>> -> 1266             [get_command_part(arg, self.pool) for arg in
>> new_args])
>>    1267
>>    1268         return args_command, temp_args
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>> in <listcomp>(.0)
>>    1264
>>    1265         args_command = "".join(
>> -> 1266             [get_command_part(arg, self.pool) for arg in
>> new_args])
>>    1267
>>    1268         return args_command, temp_args
>>
>> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py
>> in get_command_part(parameter, python_proxy_pool)
>>     296             command_part += ";" + interface
>>     297     else:
>> --> 298         command_part = REFERENCE_TYPE + parameter._get_object_id()
>>     299
>>     300     command_part += "\n"
>>
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Best Regards

Jeff Zhang

Reply via email to