[jira] [Commented] (SPARK-12669) Organize options for default values
[ https://issues.apache.org/jira/browse/SPARK-12669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15156430#comment-15156430 ] Mohit Jaggi commented on SPARK-12669: - IMHO, dozen is still a lot and other APIs in spark(not other CSV parsers) do use name spaces which I prefer. It is not a big deal though. > Organize options for default values > --- > > Key: SPARK-12669 > URL: https://issues.apache.org/jira/browse/SPARK-12669 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.0.0 >Reporter: Hossein Falaki > > CSV data source in SparkSQL should be able to differentiate empty string, > null, NaN, “N/A” (maybe data type dependent). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12669) Organize options for default values
[ https://issues.apache.org/jira/browse/SPARK-12669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15107887#comment-15107887 ] Mohit Jaggi commented on SPARK-12669: - hmm...wouldn't it be good to have a typesafe API as well in addition to this one? It can be a utility on top of this API. Maps are a bit hard to use as you don't get auto-completion from IDEs, no compile time checks etc. > Organize options for default values > --- > > Key: SPARK-12669 > URL: https://issues.apache.org/jira/browse/SPARK-12669 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.0.0 >Reporter: Hossein Falaki > > CSV data source in SparkSQL should be able to differentiate empty string, > null, NaN, “N/A” (maybe data type dependent). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12669) Organize options for default values
[ https://issues.apache.org/jira/browse/SPARK-12669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15103903#comment-15103903 ] Mohit Jaggi commented on SPARK-12669: - how about the names above for a start? i think we can use typesafe config (or an alternative). in the scala API, something like withFormatingOptions(...).withNumberParsingOptions(...).withLineExceptionOptions() and in the sql API something like csv.format.escapeCharacter="\\", csv.realNumberParsing.nan="NaN, Double.NaN" etc havnt' seen the latest code but i remember a flat namespace in spark-csv. also, i don't remember a "filler value" for lines with fewer than expected fields. maybe it was added iater. i am happy to write the code for this once we have agreement on specifics of the API. > Organize options for default values > --- > > Key: SPARK-12669 > URL: https://issues.apache.org/jira/browse/SPARK-12669 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.0.0 >Reporter: Hossein Falaki > > CSV data source in SparkSQL should be able to differentiate empty string, > null, NaN, “N/A” (maybe data type dependent). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12669) Organize options for default values
[ https://issues.apache.org/jira/browse/SPARK-12669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15103441#comment-15103441 ] Mohit Jaggi commented on SPARK-12669: - Based on my experience working with CSV files, I think the following set of options make sense. What do people think? Also, what is a good way to organize these options? I like https://github.com/typesafehub/config refer: https://github.com/databricks/spark-csv/pull/124/files Option by Categories: 1. Line parsing Options a. Bad line handling: skip the line, fail completely or repair the line b. Line repairing methods: fill with "filler value" which can be configured per data type 2. Real Number parsing There are defaults that can be overridden or augmented a. NaN value: default "NaN", "Double.NaN" b. Infinity: default "Inf" c. -Infinity: default "-Inf" d. nulls: default "null" 3. Integer Parsing a. nulls: default "null" 4. String Parsing a. nulls: default "null" b. empty strings: default "" 5. Formatting a. field delimiter: default comma b. record delimiter: default new line...due to Hadoop Input Format's behavior we probably can't allow arbitrary record delimiters c. escape character: default backslash d. quote character: default quote e. ignore leading white space: default true f. ignore trailing white space: default true > Organize options for default values > --- > > Key: SPARK-12669 > URL: https://issues.apache.org/jira/browse/SPARK-12669 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.0.0 >Reporter: Hossein Falaki > > CSV data source in SparkSQL should be able to differentiate empty string, > null, NaN, “N/A” (maybe data type dependent). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9188) make open hash set/table APIs public
Mohit Jaggi created SPARK-9188: -- Summary: make open hash set/table APIs public Key: SPARK-9188 URL: https://issues.apache.org/jira/browse/SPARK-9188 Project: Spark Issue Type: Wish Components: SQL Reporter: Mohit Jaggi These data structures will be useful for writing custom aggregations and other code on spark -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9181) round bracket used in naming aggregations is not allowed by parquet writer
[ https://issues.apache.org/jira/browse/SPARK-9181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14633080#comment-14633080 ] Mohit Jaggi commented on SPARK-9181: example -- scalasdf.groupBy(col0_0).agg(avg(col2_2)).show() +---+---+ | col0_0|AVG(col2_2)| +---+---+ |[B@4b9bb066| 23.2| |[B@5a5a1f51| 900.7| |[B@52be2d2e| 123.3| |[B@43885172| 2.987| |[B@2a9b3601| 21.9| |[B@33f69181| 3.678| | [B@cac6ce| 1.897| | [B@c1feca| 90.2| +---+---+ scala sdf.groupBy(col0_0).agg(avg(col2_2)).write.parquet(/tmp/x1) 15/07/19 23:03:40 ERROR InsertIntoHadoopFsRelation: Aborting job. java.lang.RuntimeException: Attribute name AVG(col2_2) contains invalid character(s) among ,;{}() =. Please use alias to rename it. round bracket used in naming aggregations is not allowed by parquet writer -- Key: SPARK-9181 URL: https://issues.apache.org/jira/browse/SPARK-9181 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.4.0 Reporter: Mohit Jaggi Priority: Minor round bracket used in naming aggregations is not allowed by parquet writer. so one has to rename the aggregate columns before writing to parquet. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9181) round bracket used in naming aggregations is not allowed by parquet writer
Mohit Jaggi created SPARK-9181: -- Summary: round bracket used in naming aggregations is not allowed by parquet writer Key: SPARK-9181 URL: https://issues.apache.org/jira/browse/SPARK-9181 Project: Spark Issue Type: Bug Affects Versions: 1.4.0 Reporter: Mohit Jaggi round bracket used in naming aggregations is not allowed by parquet writer. so one has to rename the aggregate columns before writing to parquet. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3489) support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params
[ https://issues.apache.org/jira/browse/SPARK-3489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14290967#comment-14290967 ] Mohit Jaggi commented on SPARK-3489: pull request does exist here: https://github.com/apache/spark/pull/2429 use case example: https://github.com/AyasdiOpenSource/bigdf/blob/master/src/main/scala/com/ayasdi/bigdf/DFUtil.scala#L86 support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params -- Key: SPARK-3489 URL: https://issues.apache.org/jira/browse/SPARK-3489 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.2 Reporter: Mohit Jaggi Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5097) Adding data frame APIs to SchemaRDD
[ https://issues.apache.org/jira/browse/SPARK-5097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14275960#comment-14275960 ] Mohit Jaggi commented on SPARK-5097: minor comment: mutate existing can do df(x) = df(x) Adding data frame APIs to SchemaRDD --- Key: SPARK-5097 URL: https://issues.apache.org/jira/browse/SPARK-5097 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Reynold Xin Priority: Critical Attachments: DesignDocAddingDataFrameAPIstoSchemaRDD.pdf SchemaRDD, through its DSL, already provides common data frame functionalities. However, the DSL was originally created for constructing test cases without much end-user usability and API stability consideration. This design doc proposes a set of API changes for Scala and Python to make the SchemaRDD DSL API more usable and stable. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5097) Adding data frame APIs to SchemaRDD
[ https://issues.apache.org/jira/browse/SPARK-5097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14273994#comment-14273994 ] Mohit Jaggi commented on SPARK-5097: Hi, This is Mohit Jaggi, author of https://github.com/AyasdiOpenSource/bigdf Matei had suggested integrating bigdf with SchemaRDD and I was planning on doing that soon. I would love to contribute to this item. Most of the constructs mentioned in the design document already exist in bigdf. Mohit. Adding data frame APIs to SchemaRDD --- Key: SPARK-5097 URL: https://issues.apache.org/jira/browse/SPARK-5097 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Assignee: Reynold Xin Priority: Critical Attachments: DesignDocAddingDataFrameAPIstoSchemaRDD.pdf SchemaRDD, through its DSL, already provides common data frame functionalities. However, the DSL was originally created for constructing test cases without much end-user usability and API stability consideration. This design doc proposes a set of API changes for Scala and Python to make the SchemaRDD DSL API more usable and stable. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1296) Make RDDs Covariant
[ https://issues.apache.org/jira/browse/SPARK-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14209360#comment-14209360 ] Mohit Jaggi commented on SPARK-1296: Why is this a WONTFIX? Making RDDs covariant seems like a good idea. Make RDDs Covariant --- Key: SPARK-1296 URL: https://issues.apache.org/jira/browse/SPARK-1296 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Michael Armbrust Assignee: Michael Armbrust First, what is the problem with RDDs not being covariant {code} // Consider a function that takes a Seq of some trait. scala trait A { val a = 1 } scala def f(as: Seq[A]) = as.map(_.a) // A list of a concrete version of that trait can be used in this function. scala class B extends A scala f(new B :: Nil) res0: Seq[Int] = List(1) // Now lets try the same thing with RDDs scala def f(as: org.apache.spark.rdd.RDD[A]) = as.map(_.a) scala val rdd = sc.parallelize(new B :: Nil) rdd: org.apache.spark.rdd.RDD[B] = ParallelCollectionRDD[2] at parallelize at console:42 // :( scala f(rdd) console:45: error: type mismatch; found : org.apache.spark.rdd.RDD[B] required: org.apache.spark.rdd.RDD[A] Note: B : A, but class RDD is invariant in type T. You may wish to define T as +T instead. (SLS 4.5) f(rdd) {code} h2. Is it possible to make RDDs covariant? Probably? In terms of the public user interface, they are *mostly* covariant. (Internally we use the type parameter T in a lot of mutable state that breaks the covariance contract, but I think with casting we can 'promise' the compiler that we are behaving). There are also a lot of complications with other types that we return which are invariant. h2. What will it take to make RDDs covariant? As I mention above, all of our mutable internal state is going to require casting to avoid using T. This seems to be okay, it makes our life only slightly harder. This extra work required because we are basically promising the compiler that even if an RDD is implicitly upcast, internally we are keeping all the checkpointed data of the correct type. Since an RDD is immutable, we are okay! We also need to modify all the places where we use T in function parameters. So for example: {code} def ++[U : T : ClassTag](other: RDD[U]): RDD[U] = this.union(other).asInstanceOf[RDD[U]] {code} We are now allowing you to append an RDD of a less specific type, and then returning a less specific new RDD. This I would argue is a good change. We are strictly improving the power of the RDD interface, while maintaining reasonable type semantics. h2. So, why wouldn't we do it? There are a lot of places where we interact with invariant types. We return both Maps and Arrays from a lot of public functions. Arrays are invariant (but if we returned immutable sequences instead we would be good), and Maps are invariant in the Key (once again, immutable sequences of tuples would be great here). I don't think this is a deal breaker, and we may even be able to get away with it, without changing the returns types of these functions. For example, I think that this should work, though once again requires make promises to the compiler: {code} /** * Return an array that contains all of the elements in this RDD. */ def collect[U : T](): Array[U] = { val results = sc.runJob(this, (iter: Iterator[T]) = iter.toArray) Array.concat(results: _*).asInstanceOf[Array[U]] } {code} I started working on this [here|https://github.com/marmbrus/spark/tree/coveriantRDD]. Thoughts / suggestions are welcome! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3489) support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params
[ https://issues.apache.org/jira/browse/SPARK-3489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14136738#comment-14136738 ] Mohit Jaggi commented on SPARK-3489: Proposed diff --- MohitMacBook:spark mohit$ git diff diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a9b905b..2c9f034 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -711,6 +711,21 @@ abstract class RDD[T: ClassTag]( } } } + + /** + * Zips this RDD with a sequence of other RDDs, returning key-value pairs with the first element in each RDD, + * second element in each RDD, etc. Assumes that the two RDDs have the *same number of + * partitions* and the *same number of elements in each partition* (e.g. one was made through + * a map on the other). + */ + def zip(others: Seq[RDD[_]]): RDD[Array[Any]] = { +zipPartitions(others, preservesPartitioning = false) { iterSeq: Seq[Iterator[Any]] = +new Iterator[Array[Any]] { + def hasNext = !iterSeq.exists(! _.hasNext) + def next = iterSeq.map { iter = iter.next }.toArray + } +} + } /** * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by @@ -748,7 +763,11 @@ abstract class RDD[T: ClassTag]( (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V]): RDD[V] = new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false) - + def zipPartitions[V: ClassTag] + (others: Seq[RDD[_]], preservesPartitioning: Boolean) + (f: (Seq[Iterator[Any]]) = Iterator[V]): RDD[V] = +new ZippedPartitionsRDDn(sc, sc.clean(f), this +: others, false) + // Actions (launch a job to return a value to the user program) /** diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index f3d30f6..d22d7d3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -146,3 +146,22 @@ private[spark] class ZippedPartitionsRDD4 rdd4 = null } } + +private[spark] class ZippedPartitionsRDDn + [V: ClassTag]( +sc: SparkContext, +f: (Seq[Iterator[_]] = Iterator[V]), +var rddSeq: Seq[RDD[_]], +preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, rddSeq, preservesPartitioning) { + + override def compute(s: Partition, context: TaskContext): Iterator[V] = { +val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions +f(rdds.zipWithIndex.map (rdd = rdd._1.iterator(partitions(rdd._2), context))) + } + + override def clearDependencies() { +super.clearDependencies() +rdds = null + } +} (END) support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params -- Key: SPARK-3489 URL: https://issues.apache.org/jira/browse/SPARK-3489 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.2 Reporter: Mohit Jaggi Fix For: 1.0.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3489) support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params
Mohit Jaggi created SPARK-3489: -- Summary: support rdd.zip(rdd1, rdd2,...) with variable number of rdds as params Key: SPARK-3489 URL: https://issues.apache.org/jira/browse/SPARK-3489 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.2 Reporter: Mohit Jaggi Fix For: 1.0.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org