Hi I am running into a strange error. I am trying to write a transformer that takes in to columns and creates a LabeledPoint. I can not figure out why I am getting
AttributeError: 'DataFrame' object has no attribute _get_object_id¹ I am using spark-1.5.1-bin-hadoop2.6 Any idea what I am doing wrong? Is this a bug with data frames? Also I suspect the next problem I will run into is I do not think UDF¹s support LabeledPoint? Comments and suggestions are greatly appreciated Andy In [37]: 1 from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param 2 from pyspark.ml.util import keyword_only 3 4 from pyspark.sql.functions import udf 5 from pyspark.ml.pipeline import Transformer 6 7 from pyspark.sql.types import BinaryType, DataType, ByteType, StringType 8 from pyspark.mllib.linalg import SparseVector 9 from pyspark.mllib.regression import LabeledPoint 10 11 12 class LabledPointTransformer(Transformer, HasInputCol, HasOutputCol): 13 @keyword_only 14 def __init__(self, inputCol=None, outputCol=None, featureCol=None): 15 super(LabledPointTransformer, self).__init__() 16 self.featureCol = Param(self, "featureCol", "") 17 self._setDefault(featureCol="feature") 18 kwargs = self.__init__._input_kwargs 19 self.setParams(**kwargs) 20 21 @keyword_only 22 def setParams(self, inputCol=None, outputCol=None, featureCol=None): 23 kwargs = self.setParams._input_kwargs 24 return self._set(**kwargs) 25 26 def setFeatureCol(self, value): 27 self._paramMap[self.featureCol] = value 28 return self 29 30 def getFeatureCol(self): 31 return self.getOrDefault(self.featureCol) 32 33 def _transform(self, dataset): # dataset is a data frame 34 out_col = self.getOutputCol() 35 labelCol = self.getInputCol() 36 featureCol = self.getFeatureCol() 37 38 def f(lf): 39 return str(LabeledPoint(lf[labelCol], lf[featureCol])) 40 41 t = StringType() 42 #data = dataset[labelCol, featureCol] 43 data = dataset.select(labelCol, featureCol) 44 return dataset.withColumn(out_col, udf(f, t)(data)) 45 46 lpData = sqlContext.createDataFrame([ 47 (0, SparseVector(3, [0, 1], [1.0, 2.0])), 48 (1, SparseVector(3, [1, 2], [3.0, 1.0])), 49 ], ["label", "features"]) 50 51 lpData.show() 52 lpt = LabledPointTransformer(inputCol="label", outputCol="labeledPoint", featureCol="features",) 53 tmp = lpt.transform(lpData) 54 tmp.collect() +-----+-------------------+ |label| features| +-----+-------------------+ | 0|(3,[0,1],[1.0,2.0])| | 1|(3,[1,2],[3.0,1.0])| +-----+-------------------+ --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-37-0ed0d16e8db3> in <module>() 51 lpData.show() 52 lpt = LabledPointTransformer(inputCol="label", outputCol="labeledPoint", featureCol="features",) ---> 53 tmp = lpt.transform(lpData) 54 tmp.collect() /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/ml/pipeline .py in transform(self, dataset, params) 105 return self.copy(params,)._transform(dataset) 106 else: --> 107 return self._transform(dataset) 108 else: 109 raise ValueError("Params must be either a param map but got %s." % type(params)) <ipython-input-37-0ed0d16e8db3> in _transform(self, dataset) 42 #data = dataset[labelCol, featureCol] 43 data = dataset.select(labelCol, featureCol) ---> 44 return dataset.withColumn(out_col, udf(f, t)(data)) 45 46 lpData = sqlContext.createDataFrame([ /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/functio ns.py in __call__(self, *cols) 1436 def __call__(self, *cols): 1437 sc = SparkContext._active_spark_context -> 1438 jc = self._judf.apply(_to_seq(sc, cols, _to_java_column)) 1439 return Column(jc) 1440 /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column. py in _to_seq(sc, cols, converter) 58 """ 59 if converter: ---> 60 cols = [converter(c) for c in cols] 61 return sc._jvm.PythonUtils.toSeq(cols) 62 /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column. py in <listcomp>(.0) 58 """ 59 if converter: ---> 60 cols = [converter(c) for c in cols] 61 return sc._jvm.PythonUtils.toSeq(cols) 62 /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column. py in _to_java_column(col) 46 jcol = col._jc 47 else: ---> 48 jcol = _create_column_from_name(col) 49 return jcol 50 /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column. py in _create_column_from_name(name) 39 def _create_column_from_name(name): 40 sc = SparkContext._active_spark_context ---> 41 return sc._jvm.functions.col(name) 42 43 /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-sr c.zip/py4j/java_gateway.py in __call__(self, *args) 527 528 args_command = ''.join( --> 529 [get_command_part(arg, self.pool) for arg in new_args]) 530 531 command = CALL_COMMAND_NAME +\ /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-sr c.zip/py4j/java_gateway.py in <listcomp>(.0) 527 528 args_command = ''.join( --> 529 [get_command_part(arg, self.pool) for arg in new_args]) 530 531 command = CALL_COMMAND_NAME +\ /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-sr c.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool) 263 command_part += ';' + interface 264 else: --> 265 command_part = REFERENCE_TYPE + parameter._get_object_id() 266 267 command_part += '\n' /Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/datafra me.py in __getattr__(self, name) 747 if name not in self.columns: 748 raise AttributeError( --> 749 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) 750 jc = self._jdf.apply(name) 751 return Column(jc) AttributeError: 'DataFrame' object has no attribute '_get_object_id'