Accessing Scala RDD from pyspark

2018-03-15 Thread Shahab Yunus
Hi there.

I am calling custom Scala code from pyspark (interpreter). The customer
Scala code is simple: it just reads a textFile using sparkContext.textFile
and returns RDD[String].

In pyspark, I am using sc._jvm to make the call to the Scala code:


*s_rdd = sc._jvm.package_name.class_name.method().*

It returns a py4j.JavaObject. Now I want to use this in pyspark and doing
the following wrapping:
*py_rdd = RDD(s_dd, sparkSession)*

No error yet. But when I make a call to any RDD methods using py_rdd (e.g.
py_rdd.count()), I get the following error:
py4j.protocol.Py4JError: An error occurred while calling o50.rdd. Trace:
py4j.Py4JException: Method rdd([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)

Why is that? What I am doing wrong?

Using:
Scala version 2.11.8
(OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Spark 2.0.2
Hadoop 2.7.3-amzn-0


Thanks & Regards,
Shahab


Re: [PySpark SQL] sql function to_date and to_timestamp return the same data type

2018-03-15 Thread Nicholas Sharkey
unsubscribe

On Thu, Mar 15, 2018 at 8:00 PM, Alan Featherston Lago 
wrote:

> I'm a pretty new user of spark and I've run into this issue with the
> pyspark docs:
>
> The functions pyspark.sql.functions.to_date && 
> pyspark.sql.functions.to_timestamp
> behave in the same way. As in both functions convert a Column of
> pyspark.sql.types.StringType or pyspark.sql.types.TimestampType into
> pyspark.sql.types.DateType.
>
> Shouldn't the function `to_timestmap` return pyspark.sql.types.
> TimestampType?
> Also the to_timestamp docs say that "By default, it follows casting rules
> to pyspark.sql.types.TimestampType if the format is omitted (equivalent
> to col.cast("timestamp")). ", which doesn't seem to be right ie:
>
> to_timestamp(current_timestamp()) <> current_timestamp().cast("timestamp")
>
>
> This is wrong right? or am I missing something? (is this due to the
> underlying jvm data types?)
>
>
> Cheers,
> alan
>


[PySpark SQL] sql function to_date and to_timestamp return the same data type

2018-03-15 Thread Alan Featherston Lago
I'm a pretty new user of spark and I've run into this issue with the
pyspark docs:

The functions pyspark.sql.functions.to_date &&
pyspark.sql.functions.to_timestamp behave in the same way. As in both
functions convert a Column of pyspark.sql.types.StringType or
pyspark.sql.types.TimestampType into pyspark.sql.types.DateType.

Shouldn't the function `to_timestmap` return
pyspark.sql.types.TimestampType?
Also the to_timestamp docs say that "By default, it follows casting rules
to pyspark.sql.types.TimestampType if the format is omitted (equivalent to
col.cast("timestamp")). ", which doesn't seem to be right ie:

to_timestamp(current_timestamp()) <> current_timestamp().cast("timestamp")


This is wrong right? or am I missing something? (is this due to the
underlying jvm data types?)


Cheers,
alan


How can I launch a a thread in background on all worker nodes before the data processing actually starts?

2018-03-15 Thread ravidspark
*Environment:* 
Spark 2.2.0
*Kafka:* 0.10.0
*Language:* Java

*UseCase:* Streaming data from Kafka using JavaDStreams and storing into a
downstream database.

*Issue:* 

I have a use case, where in I have to launch a thread in the background that
would connect to a DB and Cache the retrieved resultset for every 30 mins.
This component individually is working fine. When integrated with Spark
Streaming, my tasks use the data from this MySQL thread because of which I
have to run this thread on all the worker nodes rather than the driver nodes
before the actual data processing starts. Something like a setup() below
should be run on all the worker nodes,

public static void setup() {
try {
new Util(FSFactory.getFSHandle(null));  *--> This class 
has the
implementation for the thread*
} catch (IOException e) {
logger.error("IOException: error to create Util");
}
}


*What did I try:* I tried passing the above method as a broadcast variable
in spark. But, from my understanding, broadcast variable is only a read-only
value. So, as I am running a threaded program in the background for the
broadcasted variable, I didn't see anything related to my code in the logs
and the thread did not run.

I have some knowledge on other streaming frameworks, where I can setup any
dependencies in setup() and close the dependencies in the terminate() for
every container. Is there something like that in Spark?

Am I missing any concept here? I googled around, looked on SO but couldn't
find anything useful. Any help would be grateful.

Thanks,
Ravi



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Awesome, thanks for detailing!

Was thinking the same, we've to split by comma for csv while casting inside.

Cool! Shall try it and revert back tomm.

Thanks a ton!

On 15-Mar-2018 11:50 PM, "Bowden, Chris" 
wrote:

> To remain generic, the KafkaSource can only offer the lowest common
> denominator for a schema (topic, partition, offset, key, value, timestamp,
> timestampType). As such, you can't just feed it a StructType. When you are
> using a producer or consumer directly with Kafka, serialization and
> deserialization is often an orthogonal and implicit transform. However, in
> Spark, serialization and deserialization is an explicit transform (e.g.,
> you define it in your query plan).
>
>
> To make this more granular, if we imagine your source is registered as a
> temp view named "foo":
>
> SELECT
>
>   split(cast(value as string), ',')[0] as id,
>
>   split(cast(value as string), ',')[1] as name
>
> FROM foo;
>
>
> Assuming you were providing the following messages to Kafka:
>
> 1,aakash
>
> 2,tathagata
>
> 3,chris
>
>
> You could make the query plan less repetitive. I don't believe Spark
> offers from_csv out of the box as an expression (although CSV is well
> supported as a data source). You could implement an expression by reusing a
> lot of the supporting CSV classes which may result in a better user
> experience vs. explicitly using split and array indices, etc. In this
> simple example, casting the binary to a string just works because there is
> a common understanding of string's encoded as bytes between Spark and Kafka
> by default.
>
>
> -Chris
> --
> *From:* Aakash Basu 
> *Sent:* Thursday, March 15, 2018 10:48:45 AM
> *To:* Bowden, Chris
> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hey Chris,
>
> You got it right. I'm reading a *csv *file from local as mentioned above,
> with a console producer on Kafka side.
>
> So, as it is a csv data with headers, shall I then use from_csv on the
> spark side and provide a StructType to shape it up with a schema and then
> cast it to string as TD suggested?
>
> I'm getting all of your points at a very high level. A little more
> granularity would help.
>
> *In the slide TD just shared*, PFA, I'm confused at the point where he is
> casting the value as string. Logically, the value shall consist of all the
> entire data set, so, suppose, I've a table with many columns, *how can I
> provide a single alias as he did in the groupBy. I missed it there itself.
> Another question is, do I have to cast in groupBy itself? Can't I do it
> directly in a select query? The last one, if the steps are followed, can I
> then run a SQL query on top of the columns separately?*
>
> Thanks,
> Aakash.
>
>
> On 15-Mar-2018 9:07 PM, "Bowden, Chris" 
> wrote:
>
> You need to tell Spark about the structure of the data, it doesn't know
> ahead of time if you put avro, json, protobuf, etc. in kafka for the
> message format. If the messages are in json, Spark provides from_json out
> of the box. For a very simple POC you can happily cast the value to a
> string, etc. if you are prototyping and pushing messages by hand with a
> console producer on the kafka side.
>
> 
> From: Aakash Basu 
> Sent: Thursday, March 15, 2018 7:52:28 AM
> To: Tathagata Das
> Cc: Dylan Guedes; Georg Heiler; user
> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi,
>
> And if I run this below piece of code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test1").load())
>
> table2_stream = (
> spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
>
> "localhost:9092").option("subscribe",
>
>  "test2").load())
>
> joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # joined_Stream.show()
>
> # query =
> table1_stream.writeStream.format("console").start().awaitTermination()
> # .queryName("table_A").format("memory")
> # spark.sql("select * from table_A").show()
> time.sleep(10)  # sleep 20 seconds
> # query.stop()
> # query
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> Stream_Stream_Join.py
>
>
>
>
> I get the below error (in Spark 2.3.0) -
>
> Traceback (most recent call last):
>   File 
> 

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Hey Chris,

You got it right. I'm reading a *csv *file from local as mentioned above,
with a console producer on Kafka side.

So, as it is a csv data with headers, shall I then use from_csv on the
spark side and provide a StructType to shape it up with a schema and then
cast it to string as TD suggested?

I'm getting all of your points at a very high level. A little more
granularity would help.

*In the slide TD just shared*, PFA, I'm confused at the point where he is
casting the value as string. Logically, the value shall consist of all the
entire data set, so, suppose, I've a table with many columns, *how can I
provide a single alias as he did in the groupBy. I missed it there itself.
Another question is, do I have to cast in groupBy itself? Can't I do it
directly in a select query? The last one, if the steps are followed, can I
then run a SQL query on top of the columns separately?*

Thanks,
Aakash.


On 15-Mar-2018 9:07 PM, "Bowden, Chris"  wrote:

You need to tell Spark about the structure of the data, it doesn't know
ahead of time if you put avro, json, protobuf, etc. in kafka for the
message format. If the messages are in json, Spark provides from_json out
of the box. For a very simple POC you can happily cast the value to a
string, etc. if you are prototyping and pushing messages by hand with a
console producer on the kafka side.


From: Aakash Basu 
Sent: Thursday, March 15, 2018 7:52:28 AM
To: Tathagata Das
Cc: Dylan Guedes; Georg Heiler; user
Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query

Hi,

And if I run this below piece of code -


from pyspark.sql import SparkSession
import time

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()
# ssc = StreamingContext(spark, 20)

table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"test1").load())

table2_stream = (
spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",

  "localhost:9092").option("subscribe",

   "test2").load())

joined_Stream = table1_stream.join(table2_stream, "Id")
#
# joined_Stream.show()

# query =
table1_stream.writeStream.format("console").start().awaitTermination()
# .queryName("table_A").format("memory")
# spark.sql("select * from table_A").show()
time.sleep(10)  # sleep 20 seconds
# query.stop()
# query


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py




I get the below error (in Spark 2.3.0) -

Traceback (most recent call last):
  File 
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 4, in 
class test:
  File 
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 19, in test
joined_Stream = table1_stream.join(table2_stream, "Id")
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be resolved
on the left side of the join. The left-side columns: [key, value, topic,
partition, offset, timestamp, timestampType];'

Seems, as per the documentation, they key and value are deserialized as
byte arrays.

I am badly stuck at this step, not many materials online, with steps to
proceed on this, too.

Any help, guys?

Thanks,
Aakash.


On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu > wrote:
Any help on the above?

On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu > wrote:
Hi,

I progressed a bit in the above mentioned topic -

1) I am feeding a CSV file into the Kafka topic.
2) Feeding the Kafka topic as readStream as TD's article suggests.
3) Then, simply trying to do a show on the streaming dataframe, using
queryName('XYZ') in the writeStream and writing a sql query on top of it,
but that doesn't show anything.
4) Once all the above problems are resolved, I want to perform a
stream-stream join.

The CSV file I'm ingesting into Kafka has -

id,first_name,last_name
1,Kellyann,Moyne
2,Morty,Blacker
3,Tobit,Robardley
4,Wilona,Kells
5,Reggy,Comizzoli


My test code -


from pyspark.sql import SparkSession
import time

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \

Sparklyr and idle executors

2018-03-15 Thread Florian Dewes
Hi all,

I am currently trying to enable dynamic resource allocation for a little yarn 
managed spark cluster.
We are using sparklyr to access spark from R and have multiple jobs which 
should run in parallel, because some of them take several days to complete or 
are in development.

Everything works out so far, the only problem we have is that executors are not 
removed from idle jobs. 

Lets say job A is the only running job that loads a file that is several 
hundred GB in size and then goes idle without disconnecting from spark. It gets 
80% of the cluster because I set a maximum value via 
spark.dynamicAllocation.maxExecutors.

When we start another job (B) with the remaining 20% of the cluster resources, 
no idle executors of the other job are freed and the idle job will keep 80% of 
the cluster's resources, although spark.dynamicAllocation.executorIdleTimeout 
is set.

Only if we disconnect job A, B will allocate the freed executors. 

Configuration settings used:

spark.shuffle.service.enabled = "true"
spark.dynamicAllocation.enabled = “true"
spark.dynamicAllocation.executorIdleTimeout = 120
spark.dynamicAllocation.maxExecutors = 100

with

Spark 2.1.0
R 3.4.3
sparklyr 0.6.3


Any ideas?


Thanks,

Florian



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Tathagata Das
Chris identified the problem correctly. You need to parse out the json text
from Kafka into separate columns before you can join them up.
I walk through an example of this in my slides -
https://www.slideshare.net/databricks/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-with-tathagata-das


On Thu, Mar 15, 2018 at 8:37 AM, Bowden, Chris 
wrote:

> You need to tell Spark about the structure of the data, it doesn't know
> ahead of time if you put avro, json, protobuf, etc. in kafka for the
> message format. If the messages are in json, Spark provides from_json out
> of the box. For a very simple POC you can happily cast the value to a
> string, etc. if you are prototyping and pushing messages by hand with a
> console producer on the kafka side.
>
> 
> From: Aakash Basu 
> Sent: Thursday, March 15, 2018 7:52:28 AM
> To: Tathagata Das
> Cc: Dylan Guedes; Georg Heiler; user
> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>
> Hi,
>
> And if I run this below piece of code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe",
> "test1").load())
>
> table2_stream = (
> spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",
>
> "localhost:9092").option("subscribe",
>
>  "test2").load())
>
> joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # joined_Stream.show()
>
> # query =
> table1_stream.writeStream.format("console").start().awaitTermination()
> # .queryName("table_A").format("memory")
> # spark.sql("select * from table_A").show()
> time.sleep(10)  # sleep 20 seconds
> # query.stop()
> # query
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
> Stream_Stream_Join.py
>
>
>
>
> I get the below error (in Spark 2.3.0) -
>
> Traceback (most recent call last):
>   File "/home/aakashbasu/PycharmProjects/AllMyRnD/
> Kafka_Spark/Stream_Stream_Join.py", line 4, in 
> class test:
>   File "/home/aakashbasu/PycharmProjects/AllMyRnD/
> Kafka_Spark/Stream_Stream_Join.py", line 19, in test
> joined_Stream = table1_stream.join(table2_stream, "Id")
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/pyspark.zip/pyspark/sql/dataframe.py", line 931, in join
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
>   File "/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/
> lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
> pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
> resolved on the left side of the join. The left-side columns: [key, value,
> topic, partition, offset, timestamp, timestampType];'
>
> Seems, as per the documentation, they key and value are deserialized as
> byte arrays.
>
> I am badly stuck at this step, not many materials online, with steps to
> proceed on this, too.
>
> Any help, guys?
>
> Thanks,
> Aakash.
>
>
> On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu  mailto:aakash.spark@gmail.com>> wrote:
> Any help on the above?
>
> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu  mailto:aakash.spark@gmail.com>> wrote:
> Hi,
>
> I progressed a bit in the above mentioned topic -
>
> 1) I am feeding a CSV file into the Kafka topic.
> 2) Feeding the Kafka topic as readStream as TD's article suggests.
> 3) Then, simply trying to do a show on the streaming dataframe, using
> queryName('XYZ') in the writeStream and writing a sql query on top of it,
> but that doesn't show anything.
> 4) Once all the above problems are resolved, I want to perform a
> stream-stream join.
>
> The CSV file I'm ingesting into Kafka has -
>
> id,first_name,last_name
> 1,Kellyann,Moyne
> 2,Morty,Blacker
> 3,Tobit,Robardley
> 4,Wilona,Kells
> 5,Reggy,Comizzoli
>
>
> My test code -
>
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe",
> "test1").load())
>
> # table2_stream = 
> (spark.readStream.format("kafka").option("kafka.bootstrap.servers",
> 

Re: Spark Conf

2018-03-15 Thread Neil Jonkers
Hi

"In general, configuration values explicitly set on a SparkConf take the
highest precedence, then flags passed to spark-submit, then values in the
defaults file."
https://spark.apache.org/docs/latest/submitting-applications.html

Perhaps this will help Vinyas:
Look at args.sparkProperties in
https://github.com/apache/spark/blob/v2.3.0/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

On Thu, Mar 15, 2018 at 1:53 AM, Vinyas Shetty 
wrote:

>
> Hi,
>
> I am trying to understand the spark internals ,so was looking the spark
> code flow. Now in a scenario where i do a spark-submit in yarn cluster mode
> with --executor-memory 8g via command line ,now how does spark know about
> this exectuor memory value ,since in SparkContext i see :
>
> _executorMemory = _conf.getOption("spark.executor.memory")
> 
> .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
>.orElse(Option(System.getenv("SPARK_MEM"))
>
>
> Now SparkConf loads the default from Java System Properties ,but then i
> did not find where the command line value is added to Java System
> Properties sys.props in yarn cluster mode ie did not see a call to
> Utils.loadDefaultSparkProperties.How is this default command line value
> reaching the SparkConf which is part of SparkContext.
>
> Regards,
> Vinyas
>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Hi,

And if I run this below piece of code -

from pyspark.sql import SparkSession
import time

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()
# ssc = StreamingContext(spark, 20)

table1_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load())

table2_stream = (
spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",

   "localhost:9092").option("subscribe",

"test2").load())

joined_Stream = table1_stream.join(table2_stream, "Id")
#
# joined_Stream.show()

# query =
table1_stream.writeStream.format("console").start().awaitTermination()
 # .queryName("table_A").format("memory")
# spark.sql("select * from table_A").show()
time.sleep(10)  # sleep 20 seconds
# query.stop()
# query


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py




I get the below error (in *Spark 2.3.0*) -

Traceback (most recent call last):
  File
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 4, in 
class test:
  File
"/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py",
line 19, in test
joined_Stream = table1_stream.join(table2_stream, "Id")
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
line 931, in join
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py",
line 1160, in __call__
  File
"/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 69, in deco


*pyspark.sql.utils.AnalysisException: u'USING column `Id` cannot be
resolved on the left side of the join. The left-side columns: [key, value,
topic, partition, offset, timestamp, timestampType];'*
Seems, as per the documentation, they key and value are deserialized as
byte arrays.

I am badly stuck at this step, not many materials online, with steps to
proceed on this, too.

Any help, guys?

Thanks,
Aakash.


On Thu, Mar 15, 2018 at 7:54 PM, Aakash Basu 
wrote:

> Any help on the above?
>
> On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu 
> wrote:
>
>> Hi,
>>
>> I progressed a bit in the above mentioned topic -
>>
>> 1) I am feeding a CSV file into the Kafka topic.
>> 2) Feeding the Kafka topic as readStream as TD's article suggests.
>> 3) Then, simply trying to do a show on the streaming dataframe, using
>> queryName('XYZ') in the writeStream and writing a sql query on top of it,
>> but that doesn't show anything.
>> 4) Once all the above problems are resolved, I want to perform a
>> stream-stream join.
>>
>> The CSV file I'm ingesting into Kafka has -
>>
>> id,first_name,last_name
>> 1,Kellyann,Moyne
>> 2,Morty,Blacker
>> 3,Tobit,Robardley
>> 4,Wilona,Kells
>> 5,Reggy,Comizzoli
>>
>>
>> My test code -
>>
>> from pyspark.sql import SparkSession
>> import time
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("DirectKafka_Spark_Stream_Stream_Join") \
>> .getOrCreate()
>> # ssc = StreamingContext(spark, 20)
>>
>> table1_stream = 
>> (spark.readStream.format("kafka").option("startingOffsets", 
>> "earliest").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "test1").load())
>>
>> # table2_stream = 
>> (spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "test2").load())
>>
>> # joined_Stream = table1_stream.join(table2_stream, "Id")
>> #
>> # joined_Stream.show()
>>
>> query = 
>> table1_stream.writeStream.format("console").queryName("table_A").start()  # 
>> .format("memory")
>> # spark.sql("select * from table_A").show()
>> # time.sleep(10)  # sleep 20 seconds
>> # query.stop()
>> query.awaitTermination()
>>
>>
>> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit 
>> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 
>> Stream_Stream_Join.py
>>
>>
>> The output I'm getting (whereas I simply want to show() my dataframe) -
>>
>> +++-+-+--+--
>> --+-+
>> | key|   value|topic|partition|offset|
>> timestamp|timestampType|
>> +++-+-+--+--
>> --+-+
>> |null|[69 64 2C 66 69 7...|test1|0|  5226|2018-03-15
>> 15:48:...|0|
>> |null|[31 2C 4B 65 6C 6...|test1|0|  5227|2018-03-15
>> 15:48:...|0|
>> |null|[32 2C 4D 6F 72 7...|test1|0|  5228|2018-03-15
>> 15:48:...|0|
>> |null|[33 2C 54 6F 62 

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Any help on the above?

On Thu, Mar 15, 2018 at 3:53 PM, Aakash Basu 
wrote:

> Hi,
>
> I progressed a bit in the above mentioned topic -
>
> 1) I am feeding a CSV file into the Kafka topic.
> 2) Feeding the Kafka topic as readStream as TD's article suggests.
> 3) Then, simply trying to do a show on the streaming dataframe, using
> queryName('XYZ') in the writeStream and writing a sql query on top of it,
> but that doesn't show anything.
> 4) Once all the above problems are resolved, I want to perform a
> stream-stream join.
>
> The CSV file I'm ingesting into Kafka has -
>
> id,first_name,last_name
> 1,Kellyann,Moyne
> 2,Morty,Blacker
> 3,Tobit,Robardley
> 4,Wilona,Kells
> 5,Reggy,Comizzoli
>
>
> My test code -
>
> from pyspark.sql import SparkSession
> import time
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
> # ssc = StreamingContext(spark, 20)
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test1").load())
>
> # table2_stream = 
> (spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test2").load())
>
> # joined_Stream = table1_stream.join(table2_stream, "Id")
> #
> # joined_Stream.show()
>
> query = 
> table1_stream.writeStream.format("console").queryName("table_A").start()  # 
> .format("memory")
> # spark.sql("select * from table_A").show()
> # time.sleep(10)  # sleep 20 seconds
> # query.stop()
> query.awaitTermination()
>
>
> # /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Stream_Stream_Join.py
>
>
> The output I'm getting (whereas I simply want to show() my dataframe) -
>
> +++-+-+--+--
> --+-+
> | key|   value|topic|partition|offset|
> timestamp|timestampType|
> +++-+-+--+--
> --+-+
> |null|[69 64 2C 66 69 7...|test1|0|  5226|2018-03-15
> 15:48:...|0|
> |null|[31 2C 4B 65 6C 6...|test1|0|  5227|2018-03-15
> 15:48:...|0|
> |null|[32 2C 4D 6F 72 7...|test1|0|  5228|2018-03-15
> 15:48:...|0|
> |null|[33 2C 54 6F 62 6...|test1|0|  5229|2018-03-15
> 15:48:...|0|
> |null|[34 2C 57 69 6C 6...|test1|0|  5230|2018-03-15
> 15:48:...|0|
> |null|[35 2C 52 65 67 6...|test1|0|  5231|2018-03-15
> 15:48:...|0|
> +++-+-+--+--
> --+-+
>
> 18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
>   "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
>   "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
>   "name" : "table_A",
>   "timestamp" : "2018-03-15T10:18:07.218Z",
>   "numInputRows" : 6,
>   "inputRowsPerSecond" : 461.53846153846155,
>   "processedRowsPerSecond" : 14.634146341463415,
>   "durationMs" : {
> "addBatch" : 241,
> "getBatch" : 15,
> "getOffset" : 2,
> "queryPlanning" : 2,
> "triggerExecution" : 410,
> "walCommit" : 135
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "KafkaSource[Subscribe[test1]]",
> "startOffset" : {
>   "test1" : {
> "0" : 5226
>   }
> },
> "endOffset" : {
>   "test1" : {
> "0" : 5232
>   }
> },
> "numInputRows" : 6,
> "inputRowsPerSecond" : 461.53846153846155,
> "processedRowsPerSecond" : 14.634146341463415
>   } ],
>   "sink" : {
> "description" : "org.apache.spark.sql.execution.streaming.
> ConsoleSink@3dfc7990"
>   }
> }
>
> P.S - If I add the below piece in the code, it doesn't print a DF of the
> actual table.
>
> spark.sql("select * from table_A").show()
>
>
> Any help?
>
>
> Thanks,
> Aakash.
>
> On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu 
> wrote:
>
>> Thanks to TD, the savior!
>>
>> Shall look into it.
>>
>> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>>> -stream-stream-joins-in-apache-spark-2-3.html
>>>
>>> This is true stream-stream join which will automatically buffer delayed
>>> data and appropriately join stuff with SQL join semantics. Please check it
>>> out :)
>>>
>>> TD
>>>
>>>
>>>
>>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes 
>>> wrote:
>>>
 I misread it, and thought that you question was if pyspark supports
 kafka lol. Sorry!

 On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu <
 aakash.spark@gmail.com> wrote:

> Hey Dylan,
>
> Great!
>
> Can you revert back to my 

Re: What's the best way to have Spark a service?

2018-03-15 Thread Jean Georges Perrin
Hi David,

I ended building up my own. Livy sounded great on paper, but heavy to 
manipulate. I found out about Jobserver too late. We did not find too 
complicated to build ours, with a small Spring boot app that was holding the 
session (we did not need more than one session).

jg


> On Mar 15, 2018, at 07:06, David Espinosa  wrote:
> 
> Hi all,
> 
> I'm quite new to Spark, and I would like to ask whats the best way to have 
> Spark as a service, and for that I mean being able to include the response of 
> a scala app/job running in a Spark into a RESTful common request.
> 
> Up now I have read about Apache Livy (which I tried and found incompatibility 
> problems with my scala app), Spark Jobsserver, Finch and I read that I could 
> use also Spark Streams.
> 
> Thanks in advance,
> David


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: What's the best way to have Spark a service?

2018-03-15 Thread Liana Napalkova
Hi David,

Which type of incompatibility problems do you have with Apache Livy?

BR,

Liana



From: David Espinosa 
Sent: 15 March 2018 12:06:20
To: user@spark.apache.org
Subject: What's the best way to have Spark a service?

Hi all,

I'm quite new to Spark, and I would like to ask whats the best way to have 
Spark as a service, and for that I mean being able to include the response of a 
scala app/job running in a Spark into a RESTful common request.

Up now I have read about Apache Livy (which I tried and found incompatibility 
problems with my scala app), Spark Jobsserver, Finch and I read that I could 
use also Spark Streams.

Thanks in advance,
David

DISCLAIMER: Aquest missatge pot contenir informaci? confidencial. Si vost? no 
n'?s el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a 
la seg?ent adre?a: le...@eurecat.org Si el destinatari d'aquest missatge no 
consent la utilitzaci? del correu electr?nic via Internet i la gravaci? de 
missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener informaci?n confidencial. Si usted no 
es el destinatario del mensaje, por favor b?rrelo y notif?quenoslo 
inmediatamente a la siguiente direcci?n: le...@eurecat.org Si el destinatario 
de este mensaje no consintiera la utilizaci?n del correo electr?nico v?a 
Internet y la grabaci?n de los mensajes, rogamos lo ponga en nuestro 
conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this 
message. If you are not the addressee indicated in this message you should 
destroy this message, and notify us immediately to the following address: 
le...@eurecat.org. If the addressee of this message does not consent to the use 
of Internet e-mail and message recording, please notify us immediately.





What's the best way to have Spark a service?

2018-03-15 Thread David Espinosa
Hi all,

I'm quite new to Spark, and I would like to ask whats the best way to have
Spark as a service, and for that I mean being able to include the response
of a scala app/job running in a Spark into a RESTful common request.

Up now I have read about Apache Livy (which I tried and found
incompatibility problems with my scala app), Spark Jobsserver, Finch and I
read that I could use also Spark Streams.

Thanks in advance,
David


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-15 Thread Aakash Basu
Hi,

I progressed a bit in the above mentioned topic -

1) I am feeding a CSV file into the Kafka topic.
2) Feeding the Kafka topic as readStream as TD's article suggests.
3) Then, simply trying to do a show on the streaming dataframe, using
queryName('XYZ') in the writeStream and writing a sql query on top of it,
but that doesn't show anything.
4) Once all the above problems are resolved, I want to perform a
stream-stream join.

The CSV file I'm ingesting into Kafka has -

id,first_name,last_name
1,Kellyann,Moyne
2,Morty,Blacker
3,Tobit,Robardley
4,Wilona,Kells
5,Reggy,Comizzoli


My test code -

from pyspark.sql import SparkSession
import time

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()
# ssc = StreamingContext(spark, 20)

table1_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load())

# table2_stream =
(spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test2").load())

# joined_Stream = table1_stream.join(table2_stream, "Id")
#
# joined_Stream.show()

query = 
table1_stream.writeStream.format("console").queryName("table_A").start()
 # .format("memory")
# spark.sql("select * from table_A").show()
# time.sleep(10)  # sleep 20 seconds
# query.stop()
query.awaitTermination()


# /home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Stream_Stream_Join.py


The output I'm getting (whereas I simply want to show() my dataframe) -

+++-+-+--++-+
| key|   value|topic|partition|offset|
timestamp|timestampType|
+++-+-+--++-+
|null|[69 64 2C 66 69 7...|test1|0|  5226|2018-03-15
15:48:...|0|
|null|[31 2C 4B 65 6C 6...|test1|0|  5227|2018-03-15
15:48:...|0|
|null|[32 2C 4D 6F 72 7...|test1|0|  5228|2018-03-15
15:48:...|0|
|null|[33 2C 54 6F 62 6...|test1|0|  5229|2018-03-15
15:48:...|0|
|null|[34 2C 57 69 6C 6...|test1|0|  5230|2018-03-15
15:48:...|0|
|null|[35 2C 52 65 67 6...|test1|0|  5231|2018-03-15
15:48:...|0|
+++-+-+--++-+

18/03/15 15:48:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "ca7e2862-73c6-41bf-9a6f-c79e533a2bf8",
  "runId" : "0758ddbd-9b1c-428b-aa52-1dd40d477d21",
  "name" : "table_A",
  "timestamp" : "2018-03-15T10:18:07.218Z",
  "numInputRows" : 6,
  "inputRowsPerSecond" : 461.53846153846155,
  "processedRowsPerSecond" : 14.634146341463415,
  "durationMs" : {
"addBatch" : 241,
"getBatch" : 15,
"getOffset" : 2,
"queryPlanning" : 2,
"triggerExecution" : 410,
"walCommit" : 135
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[test1]]",
"startOffset" : {
  "test1" : {
"0" : 5226
  }
},
"endOffset" : {
  "test1" : {
"0" : 5232
  }
},
"numInputRows" : 6,
"inputRowsPerSecond" : 461.53846153846155,
"processedRowsPerSecond" : 14.634146341463415
  } ],
  "sink" : {
"description" :
"org.apache.spark.sql.execution.streaming.ConsoleSink@3dfc7990"
  }
}

P.S - If I add the below piece in the code, it doesn't print a DF of the
actual table.

spark.sql("select * from table_A").show()


Any help?


Thanks,
Aakash.

On Thu, Mar 15, 2018 at 10:52 AM, Aakash Basu 
wrote:

> Thanks to TD, the savior!
>
> Shall look into it.
>
> On Thu, Mar 15, 2018 at 1:04 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Relevant: https://databricks.com/blog/2018/03/13/introducing
>> -stream-stream-joins-in-apache-spark-2-3.html
>>
>> This is true stream-stream join which will automatically buffer delayed
>> data and appropriately join stuff with SQL join semantics. Please check it
>> out :)
>>
>> TD
>>
>>
>>
>> On Wed, Mar 14, 2018 at 12:07 PM, Dylan Guedes 
>> wrote:
>>
>>> I misread it, and thought that you question was if pyspark supports
>>> kafka lol. Sorry!
>>>
>>> On Wed, Mar 14, 2018 at 3:58 PM, Aakash Basu >> > wrote:
>>>
 Hey Dylan,

 Great!

 Can you revert back to my initial and also the latest mail?

 Thanks,
 Aakash.

 On 15-Mar-2018 12:27 AM, "Dylan Guedes"  wrote:

> Hi,
>
> I've been using the Kafka with pyspark since 2.1.
>
> On Wed, Mar 14, 2018 at 3:49 PM, Aakash Basu <
> aakash.spark@gmail.com> wrote:
>
>> Hi,
>>
>> I'm yet to.
>>
>> Just want to know,