Re: installation of spark

2019-06-04 Thread Jack Kolokasis

Hello,

    at first you will need to make sure that JAVA is installed, or 
install it otherwise. Then install scala and a build tool (sbt or 
maven). In my point of view, IntelliJ IDEA is a good option to create 
your Spark applications.  At the end you have to install a distributed 
file system e.g HDFS.


    I think there is no an all-in-one configuration. But there are 
examples about how to configure you Spark cluster (e.g 
https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-standalone-example-2-workers-on-1-node-cluster.adoc).


Best,
--Iacovos
On 5/6/19 5:50 π.μ., ya wrote:

Dear list,

I am very new to spark, and I am having trouble installing it on my 
mac. I have following questions, please give me some guidance. Thank 
you very much.


1. How many and what software should I install before installing 
spark? I have been searching online, people discussing their 
experiences on this topic with different opinions, some says there is 
no need to install hadoop before install spark, some says hadoop has 
to be installed before spark. Some other people say scala has to be 
installed, whereas others say scala is included in spark, and it is 
installed automatically once spark in installed. So I am confused what 
to install for a start.


2.  Is there an simple way to configure these software? for instance, 
an all-in-one configuration file? It takes forever for me to configure 
things before I can really use it for data analysis.


I hope my questions make sense. Thank you very much.

Best regards,

YA


installation of spark

2019-06-04 Thread ya
Dear list,


I am very new to spark, and I am having trouble installing it on my mac. I have 
following questions, please give me some guidance. Thank you very much.


1. How many and what software should I install before installing spark? I have 
been searching online, people discussing their experiences on this topic with 
different opinions, some says there is no need to install hadoop before install 
spark, some says hadoop has to be installed before spark. Some other people say 
scala has to be installed, whereas others say scala is included in spark, and 
it is installed automatically once spark in installed. So I am confused what to 
install for a start.


2.  Is there an simple way to configure these software? for instance, an 
all-in-one configuration file? It takes forever for me to configure things 
before I can really use it for data analysis.


I hope my questions make sense. Thank you very much.


Best regards,


YA

Spark Streaming: Task not distributed

2019-06-04 Thread Pipster Neko
Hi,

I am curious how records are being put to task, since, as you may see on
the photo below, there's 1 specific executor that contains more task than
the other.
The setup is this:

   - Spark version 2.3.1
   - Spark streaming job runs on Spark Standalone with following
   configuration:
  - spark.max.cores: 105
  - executor-memory: 4G
  - driver-memory: 2G
  - memory.storageFraction: 0.1
  - spark.streaming.kafka.maxRatePerPartition: 15000
  - duration per second: 20 seconds
   - Spark streaming job per batch finishes at ~9 seconds, and consuming
   ~800k records
   - Spark standalone contains:
  - workers: 15 (8 cores, 30G memory per worker)
  - cores: 120
  - memory: 455.6G
   - Consumes on kafka topic with 60 partitions

The spark streaming job is consuming records on kafka
using org.apache.spark.streaming.kafka010.KafkaUtils, record format is
JSON, what it does is map and filter transformations (the data type being
transformed is a class with 50 fields), no repartitioning, and in the end
sink to another topic with 60 partitions, and transform map to pair
(timestamp as key, and class as value) -> countByValue -> sortByValue and
print the top 10 records.

Would like to do tuning and enhancements and hope someone could explain and
assist where I should look into.

[image:
screencapture-aratupstream201-prod-hnd2-bdd-local-4040-executors-2019-06-04-18_28_59.png]

Thanks in advance!

A


Re: Upsert for hive tables

2019-06-04 Thread tkrol
Hi Magnus,

Yes, I was thinking also about partitioning approach. And I think this is
the best solution in this type of scenario. 

Also my scenario is relevant to your last paragraph, the dates which are
coming are very random. I can get updated from 2012 and from 2019.
Therefore, this strategy might not be the best. Because when I do join on
let's say month = month AND year = year. Then I think I might not get much
performance gain. But I will try this approach.

If that won't be working, then I will try to play with different
partitioning schemes. 

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark structured streaming leftOuter join not working as I expect

2019-06-04 Thread Joe Ammann
Hi all

sorry, tl;dr

I'm on my first Python Spark structured streaming app, in the end joining 
messages from ~10 different Kafka topics. I've recently upgraded to Spark 
2.4.3, which has resolved all my issues with the time handling (watermarks, 
join windows) I had before with Spark 2.3.2.

My current problem happens during a leftOuter join, where messages from 3 
topics are joined, the results are then aggregated with a groupBy and finally 
put onto a result Kafka topic. On the 3 input topics involved, all messages 
have ID and LAST_MOD fields. I use the ID for joining, and the LAST_MOD as 
event timestamp on all incoming streams. Since the fields on the incoming 
messages are all named the same (ID and LAST_MOD), I rename them on all 
incoming streams with

 aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as 
A_LAST_MOD").drop(*["ID", "LAST_MOD"])

For those data frames, I then take the watermark with the A/B/C_LAST_MOD as 
event time, before joining. I know that the LAST_MOD timestamps are equal on 
the messages that I want to join together.

The first join is an inner join, where a field on stream A links with the ID of 
stream B. So I have

 aDf
.join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)

This works perfectly and generates (on my current data set) some 10'000 
records. This is the expected result.

When I add the leftOuter join of the third topic as follows

 aDf
.join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
# here the additional left join
-join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter)) 
# C_FK is the field in stream B
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)

then what I would expect is that I get the same number of output records 
(~10'000), and some of them have the additional fields from the C stream.

But what happens is that my output is reduced to ~1'500 records, exactly those 
which have a successful join on records on topic C. The other are not shown on 
the output.

I already tried

   * make sure that the optional FK on topic B is never null, by using an 
NVL2(C_FK, C_FK, '')
   * widen the time window join on the leftOuter to "B_LAST_MOD < 
C_LAST_LAST_MOD - interval 5 seconds ..."
   * use various combinations of joinWindows and watermarkLateThreshold

The result is always the same: I'm "losing" the ~8'500 records for which the 
optional join FK is NULL on topic B.

Did I totally misunderstand the concept of stream-stream left outer join? Or 
what could be wrong

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Thriftserver on yarn, sql submit take long time.

2019-06-04 Thread Jun Zhu
case without explain, also take long time to submit


19/06/04 05:56:37 DEBUG SparkSQLOperationManager: Created Operation for
> select count(*) from perf_as_reportads with
> session=org.apache.hive.service.cli.session.HiveSessionImpl@1f30fc84,
> runInBackground=true
> 19/06/04 05:56:37 INFO SparkExecuteStatementOperation: Running query
> 'select count(*) from perf_as_reportads' with
> d3ef425f-ab52-40c5-ab1c-fc1a2e2ac8bf
> 19/06/04 05:56:53 INFO ContextCleaner: Cleaned accumulator 5
> 19/06/04 05:56:53 INFO ContextCleaner: Cleaned accumulator 4
> 19/06/04 05:57:06 INFO ThriftCLIService: Client protocol version:
> HIVE_CLI_SERVICE_PROTOCOL_V8
> 19/06/04 05:57:06 INFO SessionState: Created local directory:
> /tmp/5db11ec6-4c93-4152-a4d2-ce1307005c94_resources
> 19/06/04 05:57:06 INFO SessionState: Created HDFS directory:
> /tmp/spark/spark/5db11ec6-4c93-4152-a4d2-ce1307005c94
> 19/06/04 05:57:06 INFO SessionState: Created local directory:
> /tmp/spark/5db11ec6-4c93-4152-a4d2-ce1307005c94
> 19/06/04 05:57:06 INFO SessionState: Created HDFS directory:
> /tmp/spark/spark/5db11ec6-4c93-4152-a4d2-ce1307005c94/_tmp_space.db
> 19/06/04 05:57:06 INFO HiveSessionImpl: Operation log session directory is
> created: /tmp/spark/operation_logs/5db11ec6-4c93-4152-a4d2-ce1307005c94
> 19/06/04 05:58:06 INFO ThriftCLIService: Client protocol version:
> HIVE_CLI_SERVICE_PROTOCOL_V8
> 19/06/04 05:58:06 INFO SessionState: Created local directory:
> /tmp/23027006-7f2d-4f54-af99-608264a82f51_resources
> 19/06/04 05:58:06 INFO SessionState: Created HDFS directory:
> /tmp/spark/spark/23027006-7f2d-4f54-af99-608264a82f51
> 19/06/04 05:58:06 INFO SessionState: Created local directory:
> /tmp/spark/23027006-7f2d-4f54-af99-608264a82f51
> 19/06/04 05:58:06 INFO SessionState: Created HDFS directory:
> /tmp/spark/spark/23027006-7f2d-4f54-af99-608264a82f51/_tmp_space.db
> 19/06/04 05:58:06 INFO HiveSessionImpl: Operation log session directory is
> created: /tmp/spark/operation_logs/23027006-7f2d-4f54-af99-608264a82f51
> 19/06/04 05:58:17 DEBUG SparkExecuteStatementOperation: == Parsed Logical
> Plan ==
> 'Project [unresolvedalias('count(1), None)]
> +- 'UnresolvedRelation `perf_as_reportads`
> == Analyzed Logical Plan ==
> count(1): bigint
> Aggregate [count(1) AS count(1)#227L]
> +- SubqueryAlias perf_as_reportads
>+- HiveTableRelation `default`.`perf_as_reportads`,
> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,
> [ad_app_id#177, ad_app_object_id#178, ad_clicked#179, ad_duration#180L,
> ad_start_time#181L, app_version#182, campaign_id#183,
> campaign_override#184L, campaign_rate#185, campaign_rate_type#186,
> city#187, completed_view#188, connection#189, country#190, creative_id#191,
> device_height#192L, device_id#193, device_language#194, device_make#195,
> device_model#196, device_user_agent#197, device_width#198L,
> do_not_track#199, event_id#200, ... 24 more fields], [dt#225, hr#226]
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count(1)#227L]
> +- Project
>+- HiveTableRelation `default`.`perf_as_reportads`,
> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,
> [ad_app_id#177, ad_app_object_id#178, ad_clicked#179, ad_duration#180L,
> ad_start_time#181L, app_version#182, campaign_id#183,
> campaign_override#184L, campaign_rate#185, campaign_rate_type#186,
> city#187, completed_view#188, connection#189, country#190, creative_id#191,
> device_height#192L, device_id#193, device_language#194, device_make#195,
> device_model#196, device_user_agent#197, device_width#198L,
> do_not_track#199, event_id#200, ... 24 more fields], [dt#225, hr#226]
> == Physical Plan ==
> *(2) HashAggregate(keys=[], functions=[count(1)], output=[count(1)#227L])
> +- Exchange SinglePartition
>+- *(1) HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#230L])
>   +- HiveTableScan HiveTableRelation `default`.`perf_as_reportads`,
> org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,
> [ad_app_id#177, ad_app_object_id#178, ad_clicked#179, ad_duration#180L,
> ad_start_time#181L, app_version#182, campaign_id#183,
> campaign_override#184L, campaign_rate#185, campaign_rate_type#186,
> city#187, completed_view#188, connection#189, country#190, creative_id#191,
> device_height#192L, device_id#193, device_language#194, device_make#195,
> device_model#196, device_user_agent#197, device_width#198L,
> do_not_track#199, event_id#200, ... 24 more fields], [dt#225, hr#226]
> 19/06/04 05:58:17 INFO CodeGenerator: Code generated in 9.873723 ms
> 19/06/04 05:58:17 INFO CodeGenerator: Code generated in 16.34127 ms
> 19/06/04 05:58:17 INFO CodeGenerator: Code generated in 8.78 ms
> 19/06/04 05:58:17 INFO MemoryStore: Block broadcast_0 stored as values in
> memory (estimated size 389.5 KB, free 911.9 MB)
> 19/06/04 05:58:17 INFO MemoryStore: Block broadcast_0_piece0 stored as
> bytes in memory (estimated size 33.4 KB, free 911.9 MB)
> 19/06/04 05:58:17 INFO BlockManagerInfo: Added broadcast_0_piece0 in
> 

Spark Thriftserver on yarn, sql submit take long time.

2019-06-04 Thread Jun Zhu
Hi ,
Running thrift server on yarn.
It's fast when beeline client send query to thrift server, but it take a
while(about 90s) to submit to yarn cluster.
>From Thrift server log:

> *19/06/04 05:48:27* DEBUG SparkSQLOperationManager: Created Operation for
> explain select count(*) from perf_as_reportads with
> session=org.apache.hive.service.cli.session.HiveSessionImpl@1f30fc84,
> runInBackground=true
> 19/06/04 05:48:27 INFO SparkExecuteStatementOperation: Running query '*explain
> select count(*) from perf_as_reportads*' with
> c63b765f-050a-412d-a817-45d5a990b59d
> 19/06/04 05:49:06 INFO ThriftCLIService: Client protocol version:
> HIVE_CLI_SERVICE_PROTOCOL_V8
> 19/06/04 05:49:06 INFO SessionState: Created local directory:
> /tmp/384b2bd2-53fd-4300-a0dd-3be6590a6029_resources
> 19/06/04 05:49:06 INFO SessionState: Created HDFS directory:
> /tmp/spark/spark/384b2bd2-53fd-4300-a0dd-3be6590a6029
> 19/06/04 05:49:06 INFO SessionState: Created local directory:
> /tmp/spark/384b2bd2-53fd-4300-a0dd-3be6590a6029
> 19/06/04 05:49:06 INFO SessionState: Created HDFS directory:
> /tmp/spark/spark/384b2bd2-53fd-4300-a0dd-3be6590a6029/_tmp_space.db
> 19/06/04 05:49:06 INFO HiveSessionImpl: Operation log session directory is
> created: /tmp/spark/operation_logs/384b2bd2-53fd-4300-a0dd-3be6590a6029
> 19/06/04 05:50:06 INFO ThriftCLIService: Client protocol version:
> HIVE_CLI_SERVICE_PROTOCOL_V8
> 19/06/04 05:50:06 INFO SessionState: Created local directory:
> /tmp/714c6377-5151-4574-969b-2c1cb2ed0d02_resources
> 19/06/04 05:50:06 INFO SessionState: Created HDFS directory:
> /tmp/spark/spark/714c6377-5151-4574-969b-2c1cb2ed0d02
> 19/06/04 05:50:06 INFO SessionState: Created local directory:
> /tmp/spark/714c6377-5151-4574-969b-2c1cb2ed0d02
> 19/06/04 05:50:06 INFO SessionState: Created HDFS directory:
> /tmp/spark/spark/714c6377-5151-4574-969b-2c1cb2ed0d02/_tmp_space.db
> 19/06/04 05:50:06 INFO HiveSessionImpl: Operation log session directory is
> created: /tmp/spark/operation_logs/714c6377-5151-4574-969b-2c1cb2ed0d02
> *19/06/04 05:50:15* DEBUG SparkExecuteStatementOperation: == Parsed
> Logical Plan ==
> ExplainCommand 'Project [unresolvedalias('count(1), None)], false, false,
> false
> == Analyzed Logical Plan ==
> plan: string
> ExplainCommand 'Project [unresolvedalias('count(1), None)], false, false,
> false
> == Optimized Logical Plan ==
> ExplainCommand 'Project [unresolvedalias('count(1), None)], false, false,
> false
> == Physical Plan ==
> Execute ExplainCommand
>+- ExplainCommand 'Project [unresolvedalias('count(1), None)], false,
> false, false
> *19/06/04 05:50:15* INFO SparkExecuteStatementOperation: Result Schema:
> StructType(StructField(plan,StringType,true))


Had set thrift server miniresource(10 instance) and initresource(10) on
yarn.
Any thought? Any config issue may related?
-- 
[image: vshapesaqua11553186012.gif]    *Jun Zhu*
Sr. Engineer I, Data
+86 18565739171

[image: in1552694272.png] [image:
fb1552694203.png]   [image:
tw1552694330.png]   [image:
ig1552694392.png] 
Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China