Unsubscribe

2018-04-17 Thread Anu B Nair



"not in" sql spend a lot of time

2018-04-17 Thread 崔苗
Hi,
when I  execute sql like that:
"select * from onlineDevice where deviceId not in (select deviceId from 
historyDevice)")
I found the task spend a lot of time(over 40 min),I stopped the task but I 
can't found the reason from spark history UI.
the historyDevice and onlineDevice only contain about 3 millions of records

spark-submit :
  --master yarn --deploy-mode client --driver-memory 8G --num-executors 2 
--executor-memory 9G --executor-cores 6








Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
Refined more: I just got rid of wrapping fields into struct, but the type
of result for UDAF is still struct. I need to extract the fields one by
one, but I guess I just haven't find a function which does the thing.

I crafted this code without IDE and ran from spark-shell, so there should
be many spots you can make it shorter or clean up.

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.sql.Date

class MaxRow extends UserDefinedAggregateFunction {
  // This is the input fields for your aggregate function.
  override def inputSchema: org.apache.spark.sql.types.StructType =
StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)

  // This is the internal fields you keep for computing your aggregate.
  override def bufferSchema: StructType =
StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)

  // This is the output type of your aggregatation function.
  override def dataType: DataType =
  new StructType().add("st", StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)
  )

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
  }

  // This is how to update your buffer schema given an input.
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
if (buffer.getAs[Any](0) == null || buffer.getInt(0) < input.getInt(0))
{
  buffer(0) = input(0)
  buffer(1) = input(1)
}
  }

  // This is how to merge two objects with the bufferSchema type.
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
= {
if (buffer1.getAs[Any](0) == null || (buffer2.getAs[Any](0) != null &&
buffer1.getInt(0) < buffer2.getInt(0))) {
  buffer1(0) = buffer2(0)
  buffer1(1) = buffer2(1)
}
  }

  // This is where you output the final value, given the final value of
your bufferSchema.
  override def evaluate(buffer: Row): Any = {
Row(Row(buffer(0), buffer(1)))
  }
}

val maxrow = new MaxRow
spark.udf.register("maxrow", maxrow)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
import spark.implicits._

val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("AMOUNT", IntegerType, true),
  StructField("MY_TIMESTAMP", DateType, true)
))

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.ID", $"data.AMOUNT", $"data.MY_TIMESTAMP")
  .groupBy($"ID")
  .agg(maxrow(col("AMOUNT"), col("MY_TIMESTAMP")).as("maxrow"))
  .selectExpr("ID", "maxrow.st.AMOUNT", "maxrow.st.MY_TIMESTAMP")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()

- Jungtaek Lim (HeartSaVioR)


2018년 4월 18일 (수) 오전 7:41, Jungtaek Lim 님이 작성:

> I think I missed something: self-join is not needed via defining UDAF and
> using it from aggregation. Since it requires all fields to be accessed, I
> can't find any other approach than wrap fields into struct and unwrap
> afterwards. There doesn't look like way to pass multiple fields in UDAF, at
> least in RelationalGroupedDataset.
>
> Here's the working code which runs fine in console:
>
> 
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import java.sql.Date
>
> class MaxRow extends UserDefinedAggregateFunction {
>   // This is the input fields for your aggregate function.
>   override def inputSchema: org.apache.spark.sql.types.StructType =
> new StructType().add("st", StructType(Seq(
> StructField("AMOUNT", IntegerType, true),
> StructField("MY_TIMESTAMP", DateType, true))
> )
>   )
>
>   // This is the internal fields you keep for computing your aggregate.
>   override def bufferSchema: StructType =
>   new StructType().add("st", StructType(Seq(
> StructField("AMOUNT", IntegerType, true),
> StructField("MY_TIMESTAMP", DateType, true))
> )
>   )
>
>   // This is the output type of your aggregatation function.
>   override def dataType: DataType =
>   new StructType().add("st", StructType(Seq(
> StructField("AMOUNT", IntegerType, true),
> StructField("MY_TIMESTAMP", DateType, true))
> )
>  

Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
I think I missed something: self-join is not needed via defining UDAF and
using it from aggregation. Since it requires all fields to be accessed, I
can't find any other approach than wrap fields into struct and unwrap
afterwards. There doesn't look like way to pass multiple fields in UDAF, at
least in RelationalGroupedDataset.

Here's the working code which runs fine in console:


import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import java.sql.Date

class MaxRow extends UserDefinedAggregateFunction {
  // This is the input fields for your aggregate function.
  override def inputSchema: org.apache.spark.sql.types.StructType =
new StructType().add("st", StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)
  )

  // This is the internal fields you keep for computing your aggregate.
  override def bufferSchema: StructType =
  new StructType().add("st", StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)
  )

  // This is the output type of your aggregatation function.
  override def dataType: DataType =
  new StructType().add("st", StructType(Seq(
StructField("AMOUNT", IntegerType, true),
StructField("MY_TIMESTAMP", DateType, true))
)
  )

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
  }

  // This is how to update your buffer schema given an input.
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
val inputRowStruct = input.getAs[Row](0)
if (buffer.getAs[Row](0) == null || buffer.getAs[Row](0).getInt(0) <
input.getAs[Row](0).getInt(0)) {
  buffer(0) = inputRowStruct
}
  }

  // This is how to merge two objects with the bufferSchema type.
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
= {
if (buffer1.getAs[Row](0) == null || (buffer2.getAs[Row](0) != null &&
buffer1.getAs[Row](0).getInt(0) < buffer2.getAs[Row](0).getInt(0))) {
  buffer1(0) = buffer2(0)
}
  }

  // This is where you output the final value, given the final value of
your bufferSchema.
  override def evaluate(buffer: Row): Any = {
buffer
  }
}

spark.udf.register("maxrow", new MaxRow)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types._
import spark.implicits._

val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("AMOUNT", IntegerType, true),
  StructField("MY_TIMESTAMP", DateType, true)
))

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .selectExpr("data.ID as ID", "struct(data.AMOUNT, data.MY_TIMESTAMP) as
structure")
  .groupBy($"ID")
  .agg("structure" -> "maxrow")
  .selectExpr("ID", "`maxrow(structure)`.st.AMOUNT",
"`maxrow(structure)`.st.MY_TIMESTAMP")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()


You still want to group records by event-time window and watermark: even
putting all five records together to the socket (by nc), two micro-batches
were handling the records and provide two results.

---
Batch: 0
---
+---+--++
| ID|AMOUNT|MY_TIMESTAMP|
+---+--++
|  1|10|  2018-04-01|
|  2|30|  2018-04-01|
+---+--++
---
Batch: 1
---
+---+--++
| ID|AMOUNT|MY_TIMESTAMP|
+---+--++
|  2|40|  2018-04-01|
+---+--++

- Jungtaek Lim (HeartSaVioR)

2018년 4월 18일 (수) 오전 5:56, Jungtaek Lim 님이 작성:

> That might be simple if you want to get aggregated values for both amount
> and my_timestamp:
>
> val schema = StructType(Seq(
>   StructField("ID", IntegerType, true),
>   StructField("AMOUNT", IntegerType, true),
>   StructField("MY_TIMESTAMP", DateType, true)
> ))
>
> val query = socketDF
>   .selectExpr("CAST(value AS STRING) as value")
>   .as[String]
>   .select(from_json($"value", schema=schema).as("data"))
>   .select($"data.ID", $"data.AMOUNT", $"data.MY_TIMESTAMP")
>   .groupBy($"ID")
>   .agg("AMOUNT" -> "max", "MY_TIMESTAMP" -> "max")
>
> which requires you to set output mode as Update mode or Complete mode.
>
> But I guess you would like to select the max row and use MY_TIME

Re: Structured streaming: Tried to fetch $offset but the returned record offset was ${record.offset}"

2018-04-17 Thread Cody Koeninger
Is this possibly related to the recent post on
https://issues.apache.org/jira/browse/SPARK-18057 ?

On Mon, Apr 16, 2018 at 11:57 AM, ARAVIND SETHURATHNAM <
asethurath...@homeaway.com.invalid> wrote:

> Hi,
>
> We have several structured streaming jobs (spark version 2.2.0) consuming
> from kafka and writing to s3. They were running fine for a month, since
> yesterday few jobs started failing and I see the below exception in the
> failed jobs  log,
>
>
>
> ```Tried to fetch 473151075 but the returned record offset was 473151072```
> ```GScheduler: ResultStage 0 (start at SparkStreamingTask.java:222) failed
> in 77.546 s due to Job aborted due to stage failure: Task 86 in stage 0.0
> failed 4 times, most recent failure: Lost task 86.3 in stage 0.0 (TID 96,
> ip-10-120-12-52.ec2.internal, executor 11): java.lang.IllegalStateException:
> Tried to fetch 473151075 but the returned record offset was 473151072
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(
> CachedKafkaConsumer.scala:234)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> getNext(KafkaSourceRDD.scala:158)
> at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> getNext(KafkaSourceRDD.scala:149)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.convert.Wrappers$IteratorWrapper.
> hasNext(Wrappers.scala:30)
> `
>
>
>
> can someone provide some direction what could be causing this all of a
> sudden when consuming from those topics?
>
>
>
> regards
>
> [image: https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]
>
> Aravind
>
>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread Jungtaek Lim
That might be simple if you want to get aggregated values for both amount
and my_timestamp:

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("AMOUNT", IntegerType, true),
  StructField("MY_TIMESTAMP", DateType, true)
))

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.ID", $"data.AMOUNT", $"data.MY_TIMESTAMP")
  .groupBy($"ID")
  .agg("AMOUNT" -> "max", "MY_TIMESTAMP" -> "max")

which requires you to set output mode as Update mode or Complete mode.

But I guess you would like to select the max row and use MY_TIMESTAMP from
max row, then I guess you need to do inner self-join, like below:

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.ID", $"data.AMOUNT")
  .groupBy($"ID")
  .agg("AMOUNT" -> "max")

val query2 = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.ID".as("SELF_ID"), $"data.AMOUNT".as("SELF_AMOUNT"),
$"data.MY_TIMESTAMP")

val query3 = query.join(query2, expr("""
   ID = ID AND
   `MAX(AMOUNT)` = SELF_AMOUNT
"""))

which is NOT valid at least for Spark 2.3, because aggregation requires
Update/Complete mode but join requires Append mode.
(Guide page of structured streaming clearly explains such limitation:
"Cannot use streaming aggregation before joins.")

If you can achieve with mapGroupWithState, you may want to stick with that.

Btw, when you deal with streaming, you may want to define logical batch for
all aggregations and joins via defining window and watermark. You wouldn't
want to get different result according to the micro-batch, and then you
always want to deal with event time window.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 4월 18일 (수) 오전 3:42, kant kodali 님이 작성:

> Hi TD,
>
> Thanks for that. The only reason I ask is I don't see any alternative
> solution to solve the problem below using raw sql.
>
>
> How to select the max row for every group in spark structured streaming
> 2.3.0 without using order by since it requires complete mode or
> mapGroupWithState?
>
> *Input:*
>
> id | amount | my_timestamp
> ---
> 1  |  5 |  2018-04-01T01:00:00.000Z
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 20 |  2018-04-01T01:20:00.000Z
> 2  | 30 |  2018-04-01T01:25:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> *Expected Output:*
>
> id | amount | my_timestamp
> ---
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> Looking for a streaming solution using either raw sql like 
> sparkSession.sql("sql
> query") or similar to raw sql but not something like mapGroupWithState
>
> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Unfortunately no. Honestly it does not make sense as for type-aware
>> operations like map, mapGroups, etc., you have to provide an actual JVM
>> function. That does not fit in with the SQL language structure.
>>
>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>


spark hbase connector

2018-04-17 Thread Lian Jiang
Hi,

My spark jobs need to talk to hbase and I am not sure which spark hbase
connector is recommended:

https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/

https://phoenix.apache.org/phoenix_spark.html

Or there is any other better solutions. Appreciate any guidance.


Re: can we use mapGroupsWithState in raw sql?

2018-04-17 Thread kant kodali
Hi TD,

Thanks for that. The only reason I ask is I don't see any alternative
solution to solve the problem below using raw sql.


How to select the max row for every group in spark structured streaming
2.3.0 without using order by since it requires complete mode or
mapGroupWithState?

*Input:*

id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 20 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z

*Expected Output:*

id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z

Looking for a streaming solution using either raw sql like
sparkSession.sql("sql
query") or similar to raw sql but not something like mapGroupWithState

On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das 
wrote:

> Unfortunately no. Honestly it does not make sense as for type-aware
> operations like map, mapGroups, etc., you have to provide an actual JVM
> function. That does not fit in with the SQL language structure.
>
> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>
>> Thanks!
>>
>>
>>
>


An exception makes different phenomnon

2018-04-17 Thread big data

> Hi all,
>
> we have two environments for spark streaming job, which consumes Kafka 
> topic to do calculation.
>
> Now in one environment, spark streaming job consume an non-standard 
> data from kafka and throw an excepiton(not catch it in code), then the 
> sreaming job is down.
>
> But in another environment, spark streaming job also throw an 
> exception( same exception message in log file), but the streaming job 
> is still running and consume other data continuously.
>
> Is there some parameters or configuration for this problem? Why one 
> job is down and another job is still running.
>
>



Re: Warning from user@spark.apache.org

2018-04-17 Thread Ahmed B.S.B Seye
hi,
I confirme.



Le mar. 17 avr. 2018 à 01:29, Prasad Velagaleti  a
écrit :

> Hello,
>I got a message saying , messages sent to me (my gmail id) from the
> mailing list got bounced  ?
> Wonder why ?
>
> thanks,
> Prasad.
>
> On Mon, Apr 16, 2018 at 6:16 PM,  wrote:
>
>> Hi! This is the ezmlm program. I'm managing the
>> user@spark.apache.org mailing list.
>>
>>
>> Messages to you from the user mailing list seem to
>> have been bouncing. I've attached a copy of the first bounce
>> message I received.
>>
>> If this message bounces too, I will send you a probe. If the probe
>> bounces,
>> I will remove your address from the user mailing list,
>> without further notice.
>>
>>
>> I've kept a list of which messages from the user mailing list have
>> bounced from your address.
>>
>> Copies of these messages may be in the archive.
>> To retrieve a set of messages 123-145 (a maximum of 100 per request),
>> send a short message to:
>>
>>
>> To receive a subject and author list for the last 100 or so messages,
>> send a short message to:
>>
>>
>> Here are the message numbers:
>>
>>74336
>>
>> --- Enclosed is a copy of the bounce message I received.
>>
>> Return-Path: <>
>>
> Received: (qmail 55901 invoked for bounce); 6 Apr 2018 23:03:41 -
>> Date: 6 Apr 2018 23:03:41 -
>> From: mailer-dae...@apache.org
>> To: user-return-743...@spark.apache.org
>> Subject: failure notice
>>
>>
>


Re: pyspark execution

2018-04-17 Thread hemant singh
If it contains only SQL then you can use a function as below -

import subprocess

def run_sql(sql_file_path, your_db_name ,location):

subprocess.call(["spark-sql","-S","--hivevar","",,"--hivevar","LOCATION",location,"-f",sql_file_path])

In you have other pieces like spark code and not only sql in that file-

Write a parse function which parse you sql and replace the placeholders
like DB Name etc in your sql and then execute the new formed sql.

Maintaining your sql in a separate file though de-couples the code and sql
and make it easier from maintenance perspective.

On Tue, Apr 17, 2018 at 8:11 AM, anudeep  wrote:

> Hi All,
>
> I have a python file which I am executing directly with spark-submit
> command.
>
> Inside the python file, I have sql written using hive context.I created a
> generic variable for the  database name inside sql
>
> The problem is : How can I pass the value for this variable dynamically
> just as we give in hive like --hivevar parameter.
>
> Thanks!
> Anudeep
>
>
>
>
>
>
>