How to add spark structured streaming kafka source receiver

2019-08-09 Thread zenglong chen
I have set the " maxOffsetsPerTrigger",but it still receive one partition
per trigger on micro-batch mode.So where to set receiving on 10  partitions
parallel like what is Spark Streaming doing?


Using StreamingQueryListener.OnTerminate for Kafka Offset restore

2019-08-09 Thread Sandish Kumar HN
Hey Everyone,

I'm using Spark StreamingQueryListener in Structured Streaming App

Whenever I see an OffsetOutOfRangeException's in Spark Job Inside
StreamingQueryListener.onTerminated method I'm updating the Spark
checkpoint directory offsets.

I was able to parse all OffsetOutOfRangeException's occurred on Job and
catch them and parse the partition, offset and connect to Kafka using Kafka
API and get right offsets and update the spark checkpointlocation/offsets
folder.

Everything works fine even if there are multiple partitions with
OffsetOutOfRangeException,

I was able to recover from OffsetOutOfRangeException's from the next run.

Does that make sense?

My question Is:
is there any locking for checkpointlocation/offsets folder? what if
multiple executors try to update checkpointlocation/offsets folder?

I also see StreamingQueryListener asynchronous API? it is across Executors
or Just with in the executor?

-- 

Thanks,
Regards,
SandishKumar HN


Re: Dataset -- Schema for type scala.collection.Set[scala.Int] is not supported

2019-08-09 Thread Mohit Jaggi
switched to immutable.Set and it works. this is weird as the code in
ScalaReflection.scala seems to support scala.collection.Set

cc: dev list, in case this is a bug

On Thu, Aug 8, 2019 at 8:41 PM Mohit Jaggi  wrote:

> Is this not supported? I found this diff
>  and wonder if this is
> a bug or am I doing something wrong?
>
>  see below =
>
> import scala.collection.Set
> case class A(ps: Set[Int], x: Int)
>
> val az = Seq(A(Set(1, 2), 1), A(Set(2), 2))
>
> az.toDS
> java.lang.UnsupportedOperationException: Schema for type
> scala.collection.Set[scala.Int] is not supported
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
> at
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:388)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:387)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:387)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:157)
> at
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:157)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:145)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
> at org.apache.spark.sql.Encoders$.product(Encoders.scala:276)
> at
> org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
> at
> org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:6)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:82)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:84)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:86)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:88)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:90)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:92)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:94)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:96)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:98)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:100)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw$$iw.(command-661233094182065:102)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw$$iw.(command-661233094182065:104)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw$$iw.(command-661233094182065:106)
> at
> line9f6df40864bf4b14acca9f5c334e0286112.$read$$iw$$iw.(command-661233094182065:108)
> at
> 

Re: Unable to write data from Spark into a Hive Managed table

2019-08-09 Thread Mich Talebzadeh
Check your permissioning.

Can you do insert select from external table into Hive managed table
created by spark?

//
// Need to create and populate target ORC table transactioncodes_ll in
database accounts.in Hive
//
HiveContext.sql("use accounts")
//
// Drop and create table transactioncodes_ll
//
spark.sql("DROP TABLE IF EXISTS accounts.transactioncodes_ll")
var sqltext = ""
sqltext = """
CREATE TABLE accounts.transactioncodes_ll (
id Int
,transactiontype   String
,description   String
)
COMMENT 'from Hive external table'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
HiveContext.sql(sqltext)
//
// Put data in Hive table. Clean up is already done
//
sqltext = """
INSERT INTO TABLE accounts.transactioncodes_ll
SELECT
  id
, TRIM(transactiontype)
, TRIM(description)
FROM 
"""
spark.sql(sqltext)

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Aug 2019 at 12:04, Debabrata Ghosh  wrote:

> Hi ,
>   I am using Hortonworks Data Platform 3.1. I am unable to
> write data from Spark into a Hive Managed table but am able to do so in a
> Hive External table.
>
>   Would you please help get me with a resolution.
>
> Thanks,
> Debu
>


Unable to write data from Spark into a Hive Managed table

2019-08-09 Thread Debabrata Ghosh
Hi ,
  I am using Hortonworks Data Platform 3.1. I am unable to
write data from Spark into a Hive Managed table but am able to do so in a
Hive External table.

  Would you please help get me with a resolution.

Thanks,
Debu


Re: Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-09 Thread Hao Ren
Hi Mich,

Thank you for your reply.
I need to be more clear about the environment. I am using spark-shell to
run the query.
Actually, the query works even without core-site, hdfs-site being under
$SPARK_HOME/conf.
My problem is efficiency. Because all of the partitions was scanned instead
of the one in question during the execution of the spark sql query.
This is why this simple query takes too much time.
I would like to know how to improve this by just reading the specific
partition in question.

Feel free to ask more questions if I am not clear.

Best regards,
Hao

On Thu, Aug 8, 2019 at 9:05 PM Mich Talebzadeh 
wrote:

> also need others as well using soft link ls -l
>
> cd $SPARK_HOME/conf
>
> hive-site.xml -> ${HIVE_HOME/conf/hive-site.xml
> core-site.xml -> ${HADOOP_HOME}/etc/hadoop/core-site.xml
> hdfs-site.xml -> ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 8 Aug 2019 at 15:16, Hao Ren  wrote:
>
>>
>>
>> -- Forwarded message -
>> From: Hao Ren 
>> Date: Thu, Aug 8, 2019 at 4:15 PM
>> Subject: Re: Spark SQL reads all leaf directories on a partitioned Hive
>> table
>> To: Gourav Sengupta 
>>
>>
>> Hi Gourva,
>>
>> I am using enableHiveSupport.
>> The table was not created by Spark. The table already exists in Hive. All
>> I did is just reading it by using SQL query in Spark.
>> FYI, I put hive-site.xml in spark/conf/ directory to make sure that Spark
>> can access to Hive.
>>
>> Hao
>>
>> On Thu, Aug 8, 2019 at 1:24 PM Gourav Sengupta 
>> wrote:
>>
>>> Hi,
>>>
>>> Just out of curiosity did you start the SPARK session using
>>> enableHiveSupport() ?
>>>
>>> Or are you creating the table using SPARK?
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Wed, Aug 7, 2019 at 3:28 PM Hao Ren  wrote:
>>>
 Hi,
 I am using Spark SQL 2.3.3 to read a hive table which is partitioned by
 day, hour, platform, request_status and is_sampled. The underlying data is
 in parquet format on HDFS.
 Here is the SQL query to read just *one partition*.

 ```
 spark.sql("""
 SELECT rtb_platform_id, SUM(e_cpm)
 FROM raw_logs.fact_request
 WHERE day = '2019-08-01'
 AND hour = '00'
 AND platform = 'US'
 AND request_status = '3'
 AND is_sampled = 1
 GROUP BY rtb_platform_id
 """).show
 ```

 However, from the Spark web UI, the stage description shows:

 ```
 Listing leaf files and directories for 201616 paths:
 viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0,
 ...
 ```

 It seems the job is reading all of the partitions of the table and the
 job takes too long for just one partition. One workaround is using
 `spark.read.parquet` API to read parquet files directly. Spark has
 partition-awareness for partitioned directories.

 But still, I would like to know if there is a way to leverage
 partition-awareness via Hive by using `spark.sql` API?

 Any help is highly appreciated!

 Thank you.

 --
 Hao Ren

>>>
>>
>> --
>> Hao Ren
>>
>> Software Engineer in Machine Learning @ Criteo
>>
>> Paris, France
>>
>>
>> --
>> Hao Ren
>>
>> Software Engineer in Machine Learning @ Criteo
>>
>> Paris, France
>>
>

-- 
Hao Ren

Software Engineer in Machine Learning @ Criteo

Paris, France