Structured Streaming on GCP Dataproc - java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer

2022-02-01 Thread karan alang
Hello All,

I'm running a simple Structured Streaming on GCP, which reads data from
Kafka and prints onto console.

Command :

cloud dataproc jobs submit pyspark
/Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb1.py
--cluster dataproc-ss-poc  --jars
gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar
gs://spark-jars-karan/spark-core_2.12-3.1.2.jar --region us-central1

I'm getting error :

File
"/tmp/01c16a55009a42a0a29da6dde9aae4d5/StructuredStreaming_Kafka_GCP-Batch-feb1.py",
line 49, in 

df = spark.read.format('kafka')\

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 210, in load

  File
"/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
1304, in __call__

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
111, in deco

  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
line 326, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o69.load.

: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArraySerializer

at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:599)

at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)

at org.apache.spark.sql.kafka010.KafkaSourceProvider.org
$apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:348)

at
org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:128)

at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)

at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)

at
org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at py4j.Gateway.invoke(Gateway.java:282)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.serialization.ByteArraySerializer

at java.net.URLClassLoader.findClass(URLClassLoader.java:387)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

Additional details are in stackoverflow -

https://stackoverflow.com/questions/70951195/gcp-dataproc-java-lang-noclassdeffounderror-org-apache-kafka-common-serializa

Do we need to pass any other jar ?
What needs to be done to debug/fix this ?

tia !


Re: Structured Streaming - not showing records on console

2022-02-01 Thread karan alang
Hi Mich,

thnx, seems 'complete' mode is supported only if there are streaming
aggregations.
I get this error on changing the output mode.

pyspark.sql.utils.AnalysisException: Complete output mode not supported
when there are no streaming aggregations on streaming DataFrames/Datasets;

Project [value#8, topic#9, partition#10, timestamp#12]

On Tue, Feb 1, 2022 at 4:05 PM Mich Talebzadeh 
wrote:

> hm.
>
> I am trying to recall if I am correct  so you should try
> outpudeMode('complete') with format('console')
>
> result = resultMF. \
>  writeStream. \
>  outputMode('complete'). \
>  option("numRows", 1000). \
>  option("truncate", "false"). \
>  format('console'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName("temperature"). \
>  start()
>
> On another example I have
>
>result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.timeissued").alias("timeissued") \
>,
> col("parsed_value.temperature").alias("temperature")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  foreachBatch(temperatures). \
>  trigger(processingTime='60 seconds'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName("temperature"). \
>  start()
>
> def temperatures(df, batchId):
> if(len(df.take(1))) > 0:
> df.show(100,False)
> df. persist()
> AvgTemp =
> df.select(round(F.avg(col("temperature".collect()[0][0]
> df.unpersist()
> now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
> print(f"""Average temperature at {now} from batchId {batchId} is
> {AvgTemp} degrees""")
> else:
> print("DataFrame s empty")
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 1 Feb 2022 at 23:45, karan alang  wrote:
>
>> Hello Spark Experts,
>>
>> I've a simple Structured Streaming program, which reads data from Kafka,
>> and writes on the console. This is working in batch mode (i.e spark.read or
>> df.write), not not working in streaming mode.
>>
>> Details are in the stackoverflow
>>
>>
>> https://stackoverflow.com/questions/70948967/structured-streaming-not-writing-records-to-console-when-using-writestream-ba
>>
>> Any inputs on how to fix/debug this ?
>> tia !
>>
>


Re: Structured Streaming - not showing records on console

2022-02-01 Thread Mich Talebzadeh
hm.

I am trying to recall if I am correct  so you should try
outpudeMode('complete') with format('console')

result = resultMF. \
 writeStream. \
 outputMode('complete'). \
 option("numRows", 1000). \
 option("truncate", "false"). \
 format('console'). \
 option('checkpointLocation', checkpoint_path). \
 queryName("temperature"). \
 start()

On another example I have

   result = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.timeissued").alias("timeissued") \
   , col("parsed_value.temperature").alias("temperature")).
\
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 foreachBatch(temperatures). \
 trigger(processingTime='60 seconds'). \
 option('checkpointLocation', checkpoint_path). \
 queryName("temperature"). \
 start()

def temperatures(df, batchId):
if(len(df.take(1))) > 0:
df.show(100,False)
df. persist()
AvgTemp =
df.select(round(F.avg(col("temperature".collect()[0][0]
df.unpersist()
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
print(f"""Average temperature at {now} from batchId {batchId} is
{AvgTemp} degrees""")
else:
print("DataFrame s empty")

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 1 Feb 2022 at 23:45, karan alang  wrote:

> Hello Spark Experts,
>
> I've a simple Structured Streaming program, which reads data from Kafka,
> and writes on the console. This is working in batch mode (i.e spark.read or
> df.write), not not working in streaming mode.
>
> Details are in the stackoverflow
>
>
> https://stackoverflow.com/questions/70948967/structured-streaming-not-writing-records-to-console-when-using-writestream-ba
>
> Any inputs on how to fix/debug this ?
> tia !
>


Structured Streaming - not showing records on console

2022-02-01 Thread karan alang
Hello Spark Experts,

I've a simple Structured Streaming program, which reads data from Kafka,
and writes on the console. This is working in batch mode (i.e spark.read or
df.write), not not working in streaming mode.

Details are in the stackoverflow

https://stackoverflow.com/questions/70948967/structured-streaming-not-writing-records-to-console-when-using-writestream-ba

Any inputs on how to fix/debug this ?
tia !


Re: Code fails when AQE enabled in Spark 3.1

2022-02-01 Thread Sean Owen
At a glance, it doesn't seem so. That is a corner case in two ways - very
old dates and using RDDs, at least it seems.
I also suspect that individual change is tied to a lot of other date
related changes in 3.2, so may not be very back-portable.
You should pursue updating to 3.2 for many reasons, but this too if it
affects you.

On Tue, Feb 1, 2022 at 1:50 AM Gaspar Muñoz  wrote:

> it looks that this commit (
> https://github.com/apache/spark/commit/a85490659f45410be3588c669248dc4f534d2a71)
> do the trick.
>
> [image: image.png]
>
> Don't you think, this bug is enough important to incluide in 3.1 branch?
>
> Regards
>
> El jue, 20 ene 2022 a las 8:55, Gaspar Muñoz ()
> escribió:
>
>> Hi guys,
>>
>> hundreds of spark jobs run on my company every day. We are running Spark
>> 3.1.2 and we want enable Adaptive Query Execution (AQE) for all of them.
>> We can't upgrade to 3.2 right now so we want enable it explicitly using
>> appropriate conf when spark submit.
>>
>> Some of them fails when enable AQE but I can't discover what is
>> happening.  In order to give your information I prepared a small snippet
>> for spark shell that fails in Spark 3.1 when AQE enabled and works when
>> disabled. It also work in 3.2 but I think maybe is a bug that can be fixed
>> for 3.1.3.
>>
>> The code and explanation can be found here:
>> https://issues.apache.org/jira/browse/SPARK-37898
>>
>> Regards
>> --
>> Gaspar Muñoz Soria
>>
>
>
> --
> Gaspar Muñoz Soria
>
> Vía de las dos Castillas, 33, Ática 4, 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 828 6473
>


Re: A Persisted Spark DataFrame is computed twice

2022-02-01 Thread Gourav Sengupta
Hi,

Can you please try to use SPARK SQL, instead of dataframes and see the
difference?

You will get a lot of theoretical arguments, and that is fine, but they are
just largely and essentially theories.

Also try to apply the function to the result of the filters as a sub-query
by caching in the data of the filters first.



Regards,
Gourav Sengupta

On Mon, Jan 31, 2022 at 8:00 AM Benjamin Du  wrote:

> I don't think coalesce (by repartitioning I assume you mean coalesce)
> itself and deserialising takes that much time. To add a little bit more
> context, the computation of the DataFrame is CPU intensive instead of
> data/IO intensive. I purposely keep coalesce​ after df.count​ as I want
> to keep the large number of partitions (30k) when computing the DataFrame
> so that I can get a much higher parallelism. After the computation, I
> reduce the number of partitions (to avoid having too many small files on
> HDFS). It typically takes about 5 hours to compute the DataFrame (when 30k
> partitions is used) and write it to disk (without doing repartitioning or
> coalesce). If I manually write the computed DataFrame to disk, read it
> back, coalesce it and then write it back to disk, it also takes about 5
> hours. The code that I pasted in this thread takes forever to run as the
> DataFrame is obviously recomputed at df.coalesce​ and with a parallelism
> of 300 partitions, it is almost impossible to compute the DataFrame in a
> reasonable amount of time.
>
> I tried various ways but none of them worked except manually write to
> disk, read it back, repartition/coalesce it, and then write it back to
> HDFS.
>
>1. checkpoint by itself computer the DataFrame twice. (This is a known
>existing bug of checkpoint).
>
> output_mod = f"{output}/job={mod}"
> spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .checkpoint() \
> .coalesce(300) \
> .write.mode("overwrite").parquet(output_mod)
>
>
>1. persist (to Disk) + count computer the DataFrame twice.
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .persist(StorageLevel.DISK_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>1. persist to memory + count computes the DataFrame twice
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .persist(StorageLevel.MEMORY_ONLY)
> df.count()
> df.coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>1. persist (to memory) + checkpoint + coalesce computes the DataFrame
>twice
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .persist(StorageLevel.MEMORY_ONLY) \
> .checkpoint() \
> .coalesce(300).write.mode("overwrite").parquet(output_mod)
>
>
>1. persist (to memory) + checkpoint + without coalesce computes the
>DataFrame twice
>
> output_mod = f"{output}/job={mod}"
> df = spark.read.parquet("/input/hdfs/path") \
> .filter(col("n0") == n0) \
> .filter(col("n1") == n1) \
> .filter(col("h1") == h1) \
> .filter(col("j1").isin(j1)) \
> .filter(col("j0") == j0) \
> .filter(col("h0").isin(h0)) \
> .filter(col("id0").bitwiseOR(col("id1")) % jobs == mod) \
> .withColumn("test", test_score_r4(col("id0"), col("id1"))) \
> .persist(StorageLevel.MEMORY_ONLY) \
> .checkpoint() \
>