Re: Incorrect CAST to TIMESTAMP in Hive compatibility
Hi, I also noticed this issue. Actually, it was already mentioned several times. There is an existing JIRA(SPARK-17914). I am going to submit a PR to fix this in a few days. Best, Anton On Jun 5, 2017 21:42, "verbamour"wrote: > Greetings, > > I am using Hive compatibility in Spark 2.1.1 and it appears that the CAST > string to TIMESTAMP improperly trims the sub-second value. In particular, > leading zeros in the decimal portion appear to be dropped. > > Steps to reproduce: > 1. From `spark-shell` issue: `spark.sql("SELECT CAST('2017-04-05 > 16:00:48.0297580' AS TIMESTAMP)").show(100, false)` > > 2. Note erroneous result (i.e. ".0297580" becomes ".29758") > ``` > +--+ > |CAST(2017-04-05 16:00:48.0297580 AS TIMESTAMP)| > +--+ > |2017-04-05 16:00:48.29758 | > +--+ > ``` > > I am not currently plugged into the JIRA system for Spark, so if this is > truly a bug please bring it to the attention of the appropriate > authorities. > > Cheers, > -tom > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Incorrect-CAST-to-TIMESTAMP- > in-Hive-compatibility-tp28744.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Aggregator mutate b1 in place in merge
Hi, I recently extended the Spark SQL programming guide to cover user-defined aggregations, where I modified existing variables and returned them back in reduce and merge. This approach worked and it was approved by people who know the context. Hope that helps. 2017-01-29 17:17 GMT+01:00 Koert Kuipers: > anyone? > it not i will follow the trail and try to deduce it myself > > On Mon, Jan 23, 2017 at 2:31 PM, Koert Kuipers wrote: > >> looking at the docs for org.apache.spark.sql.expressions.Aggregator it >> says for reduce method: "For performance, the function may modify `b` and >> return it instead of constructing new object for b.". >> >> it makes no such comment for the merge method. >> >> this is surprising to me because i know that for >> PairRDDFunctions.aggregateByKey mutation is allowed in both seqOp and >> combOp (which are the equivalents of reduce and merge in Aggregator). >> >> is it safe to mutate b1 and return it in Aggregator.merge? >> >> >
Re: Spark Aggregator for array of doubles
Hi, take a look at this pull request that is not merged yet: https://github.com/apache/spark/pull/16329 . It contains examples in Java and Scala that can be helpful. Best regards, Anton Okolnychyi On Jan 4, 2017 23:23, "Anil Langote" <anillangote0...@gmail.com> wrote: > Hi All, > > I have been working on a use case where I have a DF which has 25 columns, > 24 columns are of type string and last column is array of doubles. For a > given set of columns I have to apply group by and add the array of doubles, > I have implemented UDAF which works fine but it's expensive in order to > tune the solution I came across Aggregators which can be implemented and > used with agg function, my question is how can we implement a aggregator > which takes array of doubles as input and returns the array of double. > > I learned that it's not possible to implement the aggregator in Java can > be done in scala only how can define the aggregator which takes array of > doubles as input, note that I have parquet file as my input. > > Any pointers are highly appreciated, I read that spark UDAF is slow and > aggregators are the way to go. > > Best Regards, > > Anil Langote > > +1-425-633-9747 >
Re: Spark Streaming with Kafka
thanks for all your replies, now I have a complete picture. 2016-12-12 16:49 GMT+01:00 Cody Koeninger <c...@koeninger.org>: > http://spark.apache.org/docs/latest/streaming-kafka-0-10- > integration.html#creating-a-direct-stream > > Use a separate group id for each stream, like the docs say. > > If you're doing multiple output operations, and aren't caching, spark > is going to read from kafka again each time, and if some of those > reads are happening for the same group and same topicpartition, it's > not going to work. > > On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno > <oleksii.duk...@gmail.com> wrote: > > Hi Anton, > > > > What is the command you run your spark app with? Why not working with > data > > instead of stream on your second stage operation? Can you provide logs > with > > the issue? > > > > ConcurrentModificationException is not a spark issue, it means that you > use > > the same Kafka consumer instance from more than one thread. > > > > Additionally, > > > > 1) As I understand new kafka consumer is created every time when you call > > KafkaUtils.createDirectStream. > > 2) If you assign the same group id to several consumer instances then all > > the consumers will get different set of messages on the same topic. This > is > > a kind of load balancing which kafka provides with its Consumer API. > > > > Oleksii > > > > On 11 December 2016 at 18:46, Anton Okolnychyi < > anton.okolnyc...@gmail.com> > > wrote: > >> > >> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and > >> nothing custom. > >> > >> > >> I will try restate the initial question. Let's consider an example. > >> > >> 1. I create a stream and subscribe to a certain topic. > >> > >> val stream = KafkaUtils.createDirectStream(...) > >> > >> 2. I extract the actual data from the stream. For instance, word counts. > >> > >> val wordCounts = stream.map(record => (record.value(), 1)) > >> > >> 3. Then I compute something and output the result to console. > >> > >> val firstResult = stream.reduceByWindow(...) > >> firstResult.print() > >> > >> Once that is done, I would like to perform another computation on top of > >> wordCounts and output that result again to console. In my current > >> understanding, I cannot just reuse wordCounts from Step 2 and should > create > >> a new stream with another group id and then define the second > computation. > >> Am I correct that if add the next part, then I can get > >> "ConcurrentModificationException: KafkaConsumer is not safe for > >> multi-threaded access"? > >> > >> // another computation on wordCounts > >> val secondResult = wordCounts.reduceByKeyAndWindow(...) > >> secondResult.output() > >> > >> Thanks, > >> Anton > >> > >> 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>: > >>> > >>> Hi, > >>> Usual general questions are: > >>> -- what is your Spark version? > >>> -- what is your Kafka version? > >>> -- do you use "standard" Kafka consumer or try to implement something > >>> custom (your own multi-threaded consumer)? > >>> > >>> The freshest docs > >>> https://spark.apache.org/docs/latest/streaming-kafka-0-10- > integration.html > >>> > >>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 > >>> !!!) > >>>> > >>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_ > each_stream"); > >>> > >>> > >>> > >>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi > >>> <anton.okolnyc...@gmail.com> wrote: > >>>> > >>>> Hi, > >>>> > >>>> I am experimenting with Spark Streaming and Kafka. I will appreciate > if > >>>> someone can say whether the following assumption is correct. > >>>> > >>>> If I have multiple computations (each with its own output) on one > stream > >>>> (created as KafkaUtils.createDirectStream), then there is a chance > to have > >>>> ConcurrentModificationException: KafkaConsumer is not safe for > >>>> multi-threaded access. To solve this problem, I should create a new > stream > >>>> with different "group.id" for each computation. > >>>> > >>>> Am I right? > >>>> > >>>> Best regards, > >>>> Anton > >>> > >>> > >> > > >
Re: Spark Streaming with Kafka
sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and nothing custom. I will try restate the initial question. Let's consider an example. 1. I create a stream and subscribe to a certain topic. val stream = KafkaUtils.createDirectStream(...) 2. I extract the actual data from the stream. For instance, word counts. val wordCounts = stream.map(record => (record.value(), 1)) 3. Then I compute something and output the result to console. val firstResult = stream.reduceByWindow(...) firstResult.print() Once that is done, I would like to perform another computation on top of wordCounts and output that result again to console. In my current understanding, I cannot just reuse wordCounts from Step 2 and should create a new stream with another group id and then define the second computation. Am I correct that if add the next part, then I can get " ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access"? // another computation on wordCounts val secondResult = wordCounts.reduceByKeyAndWindow(...) secondResult.output() Thanks, Anton 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>: > Hi, > Usual general questions are: > -- what is your Spark version? > -- what is your Kafka version? > -- do you use "standard" Kafka consumer or try to implement something > custom (your own multi-threaded consumer)? > > The freshest docs https://spark.apache.org/docs/ > latest/streaming-kafka-0-10-integration.html > > AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!) > >> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); >> >> > > On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi < > anton.okolnyc...@gmail.com> wrote: > >> Hi, >> >> I am experimenting with Spark Streaming and Kafka. I will appreciate if >> someone can say whether the following assumption is correct. >> >> If I have multiple computations (each with its own output) on one stream >> (created as KafkaUtils.createDirectStream), then there is a chance to >> have ConcurrentModificationException: KafkaConsumer is not safe for >> multi-threaded access. To solve this problem, I should create a new stream >> with different "group.id" for each computation. >> >> Am I right? >> >> Best regards, >> Anton >> > >
Spark Streaming with Kafka
Hi, I am experimenting with Spark Streaming and Kafka. I will appreciate if someone can say whether the following assumption is correct. If I have multiple computations (each with its own output) on one stream (created as KafkaUtils.createDirectStream), then there is a chance to have ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. To solve this problem, I should create a new stream with different "group.id" for each computation. Am I right? Best regards, Anton
Re: Dataframe broadcast join hint not working
Hi guys, I also experienced a situation when Spark 1.6.2 ignored my hint to do a broadcast join (i.e. broadcast(df)) with a small dataset. However, this happened only in 1 of 3 cases. Setting the "spark.sql.autoBroadcastJoinThreshold" property did not have any impact as well. All 3 cases work fine in Spark 2.0. Is there any chance that Spark can neglect manually specified broadcast operation? In other words, is Spark forced to perform a broadcast if one specifies "df1.join(broadcast(df2), ...)"? Best regards, Anton 2016-11-26 21:04 GMT+01:00 Swapnil Shinde: > I am using Spark 1.6.3 and below is the real plan (a,b,c in above were > just for illustration purpose) > > == Physical Plan == > Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN > mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815] > +- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801], > [mr_demoname#3810,mr_demo_id#3811], LeftOuter, None >:- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0 >: +- TungstenExchange > hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), > None >: +- Project [_1#3797 AS ltt#3800,_2#3798 AS > mr_demo_id#3801,_3#3799 AS mr_demoname#3802] >:+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799] >+- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0 > +- TungstenExchange > hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), > None > +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813] > +- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804 > AS mr_demo_id#3811,demoname#3805 AS mr_demo_value#3812,demovalue_etv_map#3806 > AS etv_demo_id#3813] >+- Filter ((map_type#3809 = master_roster_to_etv) && NOT > (demogroup#3803 = gender_age_id)) > +- Scan ExistingRDD[demogroup#3803, > demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807, > demovalue_old_map#3808,map_type#3809] > > > Thanks > Swapnil > > On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang wrote: > >> Could you post the result of explain `c.explain`? If it is broadcast >> join, you will see it in explain. >> >> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde < >> swapnilushi...@gmail.com> wrote: >> >>> Hello >>> I am trying a broadcast join on dataframes but it is still doing >>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold >>> higher but still no luck. >>> >>> Related piece of code- >>> val c = a.join(braodcast(b), "id") >>> >>> On a side note, if I do SizeEstimator.estimate(b) and it is really >>> high(460956584 bytes) compared to data it contains. b has just 85 rows and >>> around 4964 bytes. >>> Help is very much appreciated!! >>> >>> Thanks >>> Swapnil >>> >>> >>> >> >
Fwd:
Hi, I have experienced a problem using the Datasets API in Spark 1.6, while almost identical code works fine in Spark 2.0. The problem is related to encoders and custom aggregators. *Spark 1.6 (the aggregation produces an empty map):* implicit val intStringMapEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = org.apache.spark.sql.Encoders.kryo[Map[Int, String]] val sparkConf = new SparkConf() .setAppName("IVU DS Spark 1.6 Test") .setMaster("local[4]") val sparkContext = new SparkContext(sparkConf) val sparkSqlContext = new SQLContext(sparkContext) import sparkSqlContext.implicits._ val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, "id#2")).toDS() val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], Map[Int, String]] { override def zero = Map[Int, String]() override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = { map.updated(stopPoint.sequenceNumber, stopPoint.id) } override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = { map ++ anotherMap } override def finish(reduction: Map[Int, String]) = reduction }.toColumn val resultMap = stopPointDS .groupBy(_.line) .agg(stopPointSequenceMap) .collect() .toMap In spark.sql.execution.TypedAggregateExpression.scala, I see that the reduce is done correctly, but Spark cannot read the reduced values in the merge phase. If I replace the ExperessionEncoder with Kryo-based one (commented in the presented code), then it works fine. *Spark 2.0 (works correctly):* val spark = SparkSession .builder() .appName("IVU DS Spark 2.0 Test") .config("spark.sql.warehouse.dir", "file:///D://sparkSql") .master("local[4]") .getOrCreate() import spark.implicits._ val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, "id#2")).toDS() val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], Map[Int, String]] { override def zero = Map[Int, String]() override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = { map.updated(stopPoint.sequenceNumber, stopPoint.id) } override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = { map ++ anotherMap } override def finish(reduction: Map[Int, String]) = reduction override def bufferEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() override def outputEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() }.toColumn val resultMap = stopPointDS .groupByKey(_.line) .agg(stopPointSequenceMap) .collect() .toMap I know that Spark 1.6 has only a preview of the Datasets concept and a lot changed in 2.0. However, I would like to know if I am doing anything wrong in my 1.6 code. Thanks in advance, Anton
Expression Encoder for Map[Int, String] in a custom Aggregator on a Dataset
Hi all, I am trying to use my custom Aggregator on a GroupedDataset of case classes to create a hash map using Spark SQL 1.6.2. My Encoder[Map[Int, String]] is not capable to reconstruct the reduced values if I define it via ExpressionEncoder(). However, everything works fine if I define it as Encoders.kryo[Map[Int, String]]. I would like to know if I am doing anything wrong. I have the following use case: implicit val intStringMapEncoder: Encoder[Map[Int, String]] = ExpressionEncoder() val sparkContext = ... val sparkSqlContext = new SQLContext(sparkContext) import sparkSqlContext.implicits._ case class StopPoint(line: String, sequenceNumber: Int, id: String) val stopPointDS = Seq(StopPoint("33", 1, "1"), StopPoint("33", 2, "2")).toDS() val stopPointSequenceMap = new Aggregator[StopPoint, Map[Int, String], Map[Int, String]] { override def zero = Map[Int, String]() override def reduce(map: Map[Int, String], stopPoint: StopPoint) = { map.updated(stopPoint.sequenceNumber, stopPoint.id) } override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = { map ++ anotherMap } override def finish(reduction: Map[Int, String]) = reduction }.toColumn val resultMap = stopPointDS .groupBy(_.line) .agg(stopPointSequenceMap) .collect() .toMap In spark.sql.execution.TypedAggregateExpression.scala, I see that each entry is inserted into the initial map correctly (i.e. reduce() method works properly). However, my encoder cannot reconstruct the map from the reduce phase in the merge phase and I get an empty Map as a result of the merge method. If I replace my expression-based encoder with org.apache.spark.sql.Encoders.kryo[Map[Int, String]], I will get the correct result. (33, Map(1 -> 1, 2 -> 2)) Any ideas/suggestions are more than welcome. Sincerely, Anton Okolnychyi
Re: Re: how to select first 50 value of each group after group by?
Hi, I can try to guess what is wrong, but I might be incorrect. You should be careful with window frames (you define them via the rowsBetween() method). In my understanding, all window functions can be divided into 2 groups: - functions defined by the org.apache.spark.sql.catalyst.expressions.WindowFunction trait ("true" window functions) - all other supported functions that are marked as window functions by providing a window specification. The main distinction is that functions from the first group might have a predefined internal frame. That's exactly your case. Both row_number() and rank() functions are from the first group (i.e. they have predefined internal frames). To make your case work, you have 2 options: - remove your own frame specification(i.e. rowsBetween(0, 49)) and use only Window.partitionBy(hivetable.col("location")) - state explictly correct window frames. For instance, rowsBetween(Long.MinValue, 0) for rank() and row_number(). By the way, there is not too much documentation how Spark resolves window frames. For that reason, I created a small pull request that can help: https://github.com/apache/spark/pull/14050 It would be nice if anyone experienced can take a look at it since it is based only on my own analysis. 2016-07-07 13:26 GMT+02:00 <luohui20...@sina.com>: > hi Anton: > I check the docs you mentioned, and have code accordingly, however > met an exception like "org.apache.spark.sql.AnalysisException: Window > function row_number does not take a frame specification.;" > It Seems that the row_number API is giving a global row numbers of > every row across all frames, by my understanding. If wrong,please correct > me. > I checked all the window function API of > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$, > and just found that maybe row_number() seems matches. I am not quit sure > about it. > > here is my code: > val hc = new org.apache.spark.sql.hive.HiveContext(sc) > val hivetable = hc.sql("select * from house_sale_pv_location") > val overLocation = > Window.partitionBy(hivetable.col("location")).rowsBetween(0, 49) > val sortedDF = hivetable.withColumn("rowNumber", > row_number().over(overLocation)) > sortedDF.registerTempTable("sortedDF") > val top50 = hc.sql("select id,location from sortedDF where > rowNumber<=50") > top50.registerTempTable("top50") > hc.sql("select * from top50 where > location=30").collect.foreach(println) > > > here, hivetable is a DF that I mentioned with 3 columns "id , pv, > location", which is already sorted by pv in desc. So I didn't call orderby > in the 3rd line of my code. I just want the first 50 rows, based on > physical location, of each frame. > > To Tal: > I tried rank API, however this is not the API I want , because there > are some values have same pv are ranked as same values. And first 50 rows > of each frame is what I'm expecting. the attached file shows what I got by > using rank. > Thank you anyway, I learnt what rank could provide from your advice. > > > > ThanksBest regards! > San.Luo > > - 原始邮件 - > 发件人:Anton Okolnychyi <anton.okolnyc...@gmail.com> > 收件人:user <user@spark.apache.org> > 抄送人:luohui20...@sina.com > 主题:Re: how to select first 50 value of each group after group by? > 日期:2016年07月06日 23点22分 > > The following resources should be useful: > > > https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html > > https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html > > The last link should have the exact solution > > 2016-07-06 16:55 GMT+02:00 Tal Grynbaum <tal.grynb...@gmail.com>: > > You can use rank window function to rank each row in the group, and then > filter the rowz with rank < 50 > > On Wed, Jul 6, 2016, 14:07 <luohui20...@sina.com> wrote: > > hi there > I have a DF with 3 columns: id , pv, location.(the rows are already > grouped by location and sort by pv in des) I wanna get the first 50 id > values grouped by location. I checked the API of > dataframe,groupeddata,pairRDD, and found no match. > is there a way to do this naturally? > any info will be appreciated. > > > > > > ThanksBest regards! > San.Luo > > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >
Re: how to select first 50 value of each group after group by?
The following resources should be useful: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html The last link should have the exact solution 2016-07-06 16:55 GMT+02:00 Tal Grynbaum: > You can use rank window function to rank each row in the group, and then > filter the rowz with rank < 50 > > On Wed, Jul 6, 2016, 14:07 wrote: > >> hi there >> I have a DF with 3 columns: id , pv, location.(the rows are already >> grouped by location and sort by pv in des) I wanna get the first 50 id >> values grouped by location. I checked the API of >> dataframe,groupeddata,pairRDD, and found no match. >> is there a way to do this naturally? >> any info will be appreciated. >> >> >> >> >> >> ThanksBest regards! >> San.Luo >> >
Last() Window Function
Hi all! I am learning Spark SQL and window functions. The behavior of the last() window function was unexpected for me in one case(for a person without any previous experience in the window functions). I define my window specification as follows: Window.partitionBy('transportType, 'route).orderBy('eventTime). So, I have neither rowsBetween nor rangeBetween boundaries. In this scenario, I expect to get the latest event (by time) in a group if I apply the last('eventTime) window function over this window specification. However, this does not happen. Looking at the code, I was able to figure out that if there are no range/rows boundaries, the UnspecifiedFrame is assigned. Later, in ResolveWindowFrame for the last() function, Spark assigns a default window frame. The default frame depends on the presence of any order specification (if one has an order specification, the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW). That's why the last() window function does work I as expected in my case. There is a very helpful comment in SpecifiedWindowFrame. I wish I could find it in the documentation. That's why I have 2 questions: - Did I miss the place in the documentation where this behavior is described? If no, would it be appropriate from my side to try to find where this can be done? - Would it be appropriate/useful to add some window function examples to spark/examples? There are no such so far Sincerely, Anton Okolnychyi