Repository: spark Updated Branches: refs/heads/master ece00566e -> ffa05c84f
[SPARK-9828] [PYSPARK] Mutable values should not be default arguments Author: MechCoder <manojkumarsivaraj...@gmail.com> Closes #8110 from MechCoder/spark-9828. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffa05c84 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffa05c84 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffa05c84 Branch: refs/heads/master Commit: ffa05c84fe75663fc33f3d954d1cb1e084ab3280 Parents: ece0056 Author: MechCoder <manojkumarsivaraj...@gmail.com> Authored: Fri Aug 14 12:46:05 2015 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Fri Aug 14 12:46:05 2015 -0700 ---------------------------------------------------------------------- python/pyspark/ml/evaluation.py | 4 +++- python/pyspark/ml/param/__init__.py | 26 +++++++++++++++++--------- python/pyspark/ml/pipeline.py | 4 ++-- python/pyspark/ml/tuning.py | 8 ++++++-- python/pyspark/rdd.py | 5 ++++- python/pyspark/sql/readwriter.py | 8 ++++++-- python/pyspark/statcounter.py | 4 +++- python/pyspark/streaming/kafka.py | 12 +++++++++--- 8 files changed, 50 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ffa05c84/python/pyspark/ml/evaluation.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 2734092..e23ce05 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -46,7 +46,7 @@ class Evaluator(Params): """ raise NotImplementedError() - def evaluate(self, dataset, params={}): + def evaluate(self, dataset, params=None): """ Evaluates the output with optional parameters. @@ -56,6 +56,8 @@ class Evaluator(Params): params :return: metric """ + if params is None: + params = dict() if isinstance(params, dict): if params: return self.copy(params)._evaluate(dataset) http://git-wip-us.apache.org/repos/asf/spark/blob/ffa05c84/python/pyspark/ml/param/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index 7845536..eeeac49 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -60,14 +60,16 @@ class Params(Identifiable): __metaclass__ = ABCMeta - #: internal param map for user-supplied values param map - _paramMap = {} + def __init__(self): + super(Params, self).__init__() + #: internal param map for user-supplied values param map + self._paramMap = {} - #: internal param map for default values - _defaultParamMap = {} + #: internal param map for default values + self._defaultParamMap = {} - #: value returned by :py:func:`params` - _params = None + #: value returned by :py:func:`params` + self._params = None @property def params(self): @@ -155,7 +157,7 @@ class Params(Identifiable): else: return self._defaultParamMap[param] - def extractParamMap(self, extra={}): + def extractParamMap(self, extra=None): """ Extracts the embedded default param values and user-supplied values, and then merges them with extra values from input into @@ -165,12 +167,14 @@ class Params(Identifiable): :param extra: extra param values :return: merged param map """ + if extra is None: + extra = dict() paramMap = self._defaultParamMap.copy() paramMap.update(self._paramMap) paramMap.update(extra) return paramMap - def copy(self, extra={}): + def copy(self, extra=None): """ Creates a copy of this instance with the same uid and some extra params. The default implementation creates a @@ -181,6 +185,8 @@ class Params(Identifiable): :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ + if extra is None: + extra = dict() that = copy.copy(self) that._paramMap = self.extractParamMap(extra) return that @@ -233,7 +239,7 @@ class Params(Identifiable): self._defaultParamMap[getattr(self, param)] = value return self - def _copyValues(self, to, extra={}): + def _copyValues(self, to, extra=None): """ Copies param values from this instance to another instance for params shared by them. @@ -241,6 +247,8 @@ class Params(Identifiable): :param extra: extra params to be copied :return: the target instance with param values copied """ + if extra is None: + extra = dict() paramMap = self.extractParamMap(extra) for p in self.params: if p in paramMap and to.hasParam(p.name): http://git-wip-us.apache.org/repos/asf/spark/blob/ffa05c84/python/pyspark/ml/pipeline.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 9889f56..13cf2b0 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -141,7 +141,7 @@ class Pipeline(Estimator): @keyword_only def __init__(self, stages=None): """ - __init__(self, stages=[]) + __init__(self, stages=None) """ if stages is None: stages = [] @@ -170,7 +170,7 @@ class Pipeline(Estimator): @keyword_only def setParams(self, stages=None): """ - setParams(self, stages=[]) + setParams(self, stages=None) Sets params for Pipeline. """ if stages is None: http://git-wip-us.apache.org/repos/asf/spark/blob/ffa05c84/python/pyspark/ml/tuning.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 0bf988f..dcfee6a 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -227,7 +227,9 @@ class CrossValidator(Estimator): bestModel = est.fit(dataset, epm[bestIndex]) return CrossValidatorModel(bestModel) - def copy(self, extra={}): + def copy(self, extra=None): + if extra is None: + extra = dict() newCV = Params.copy(self, extra) if self.isSet(self.estimator): newCV.setEstimator(self.getEstimator().copy(extra)) @@ -250,7 +252,7 @@ class CrossValidatorModel(Model): def _transform(self, dataset): return self.bestModel.transform(dataset) - def copy(self, extra={}): + def copy(self, extra=None): """ Creates a copy of this instance with a randomly generated uid and some extra params. This copies the underlying bestModel, @@ -259,6 +261,8 @@ class CrossValidatorModel(Model): :param extra: Extra parameters to copy to the new instance :return: Copy of this instance """ + if extra is None: + extra = dict() return CrossValidatorModel(self.bestModel.copy(extra)) http://git-wip-us.apache.org/repos/asf/spark/blob/ffa05c84/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fa8e0a0..9ef60a7 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -700,7 +700,7 @@ class RDD(object): return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) @ignore_unicode_prefix - def pipe(self, command, env={}, checkCode=False): + def pipe(self, command, env=None, checkCode=False): """ Return an RDD created by piping elements to a forked external process. @@ -709,6 +709,9 @@ class RDD(object): :param checkCode: whether or not to check the return value of the shell command. """ + if env is None: + env = dict() + def func(iterator): pipe = Popen( shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) http://git-wip-us.apache.org/repos/asf/spark/blob/ffa05c84/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bf6ac08..78247c8 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -182,7 +182,7 @@ class DataFrameReader(object): @since(1.4) def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, - predicates=None, properties={}): + predicates=None, properties=None): """ Construct a :class:`DataFrame` representing the database table accessible via JDBC URL `url` named `table` and connection `properties`. @@ -208,6 +208,8 @@ class DataFrameReader(object): should be included. :return: a DataFrame """ + if properties is None: + properties = dict() jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)() for k in properties: jprop.setProperty(k, properties[k]) @@ -427,7 +429,7 @@ class DataFrameWriter(object): self._jwrite.orc(path) @since(1.4) - def jdbc(self, url, table, mode=None, properties={}): + def jdbc(self, url, table, mode=None, properties=None): """Saves the content of the :class:`DataFrame` to a external database table via JDBC. .. note:: Don't create too many partitions in parallel on a large cluster;\ @@ -445,6 +447,8 @@ class DataFrameWriter(object): arbitrary string tag/value. Normally at least a "user" and "password" property should be included. """ + if properties is None: + properties = dict() jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)() for k in properties: jprop.setProperty(k, properties[k]) http://git-wip-us.apache.org/repos/asf/spark/blob/ffa05c84/python/pyspark/statcounter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 944fa41..0fee3b2 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -30,7 +30,9 @@ except ImportError: class StatCounter(object): - def __init__(self, values=[]): + def __init__(self, values=None): + if values is None: + values = list() self.n = 0 # Running count of our values self.mu = 0.0 # Running mean of our values self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) http://git-wip-us.apache.org/repos/asf/spark/blob/ffa05c84/python/pyspark/streaming/kafka.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 33dd596..dc5b7fd 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -35,7 +35,7 @@ def utf8_decoder(s): class KafkaUtils(object): @staticmethod - def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, + def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ @@ -52,6 +52,8 @@ class KafkaUtils(object): :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A DStream object """ + if kafkaParams is None: + kafkaParams = dict() kafkaParams.update({ "zookeeper.connect": zkQuorum, "group.id": groupId, @@ -77,7 +79,7 @@ class KafkaUtils(object): return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1]))) @staticmethod - def createDirectStream(ssc, topics, kafkaParams, fromOffsets={}, + def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ .. note:: Experimental @@ -105,6 +107,8 @@ class KafkaUtils(object): :param valueDecoder: A function used to decode value (default is utf8_decoder). :return: A DStream object """ + if fromOffsets is None: + fromOffsets = dict() if not isinstance(topics, list): raise TypeError("topics should be list") if not isinstance(kafkaParams, dict): @@ -129,7 +133,7 @@ class KafkaUtils(object): return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer) @staticmethod - def createRDD(sc, kafkaParams, offsetRanges, leaders={}, + def createRDD(sc, kafkaParams, offsetRanges, leaders=None, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ .. note:: Experimental @@ -145,6 +149,8 @@ class KafkaUtils(object): :param valueDecoder: A function used to decode value (default is utf8_decoder) :return: A RDD object """ + if leaders is None: + leaders = dict() if not isinstance(kafkaParams, dict): raise TypeError("kafkaParams should be dict") if not isinstance(offsetRanges, list): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org