Re: About how to read spark source code with a good way [Marketing Mail]

2020-08-18 Thread joyan sil
Hi Jack and Spark experts,

Further to the question asked in this thread, what are some recommended
resources (blog/videos) that have helped you to deep dive into the spark
source code.
Thanks

Regards
Joyan

On Wed, Aug 19, 2020 at 11:06 AM Jack Kolokasis 
wrote:

> Hi,
>
>  From my experience, I suggest to read both blogs and source code. Blogs
> will give you the high-level knowledge for the different parts of the
> source code.
>
> Iacovos
>
> On 18/8/20 3:53 μ.μ., 2400 wrote:
> > Hi everyone ,
> >
> > I am an engineer, I have been using spark, and I want to try to make
> > spark better. I want to be a commitor. Before that, I want to know
> > spark thoroughly, so who knows how to make it better Read spark source
> > code, or recommend related blogs for me to learn.
> > please somebody can help me ,let's make spark better.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Slow insert into overwrite in spark in object store backed hive tables

2020-11-11 Thread joyan sil
Hi,

We are using the InsertInto method of dataframe to write into an object
store backed hive table in Google cloud. We have observed slowness in this
approach.

>From the internet, we got to know
Writes to Hive tables in Spark happen in a two-phase manner.

   - Step 1 – DistributedWrite: Data is written to a Hive staging directory
   using OutputCommitter. This step happens in a distributed manner in
   multiple executors.
   - Step 2 – FinalCopy: After the new files are written to the Hive
   staging directory, they are moved to the final location using the
   FileSystem *rename* API. This step unfortunately happens serially in the
   driver. As part of this, the metastore is also updated with the new
   partition information.

We thought of using saving the data directly in the path and then
programmatically adding the partitions and doing a msck repair table
to save time in the rename operation. Are there any other elegant ways to
implement this so that the FinalCopy step (rename API operation) can be
eliminated.
Need suggestions to speed up this write.

Few things to consider:
1. We get old data as well as new data. So there will be new partitions as
well as upserts to old partitions.
2. Insert overwrite can happen into static and dynamic partitions.

Looking forward to a solution.

Regards
Joyan


How to apply ranger policies on Spark

2020-11-23 Thread joyan sil
Hi,

We have ranger policies defined on the hive table and authorization works
as expected when we use hive cli and beeline. But when we access those hive
tables using spark-shell or spark-submit it does not work.

 Any suggestions to make Ranger work with Spark?


Regards

Joyan


Re: How to apply ranger policies on Spark

2020-11-24 Thread joyan sil
Thanks Ayan and Dennis,

'@Ayan. if I use Ranger to manage HDFS ACLS, as you mentioned it will
coarse grain control over file. I might have few fine grained use cases at
row/column level
I was going through the below JIRAS and thinking if anyone might have used
it and any user documentation for the same exists in the spark community.

https://issues.apache.org/jira/browse/RANGER-2128
https://issues.apache.org/jira/browse/SUBMARINE-409

Regards
Joyan

On Tue, Nov 24, 2020 at 1:40 PM ayan guha  wrote:

> AFAIK, Ranger secures Hive (JDBC) server only. Unfortunately Spark does
> not interact with HS2, but directly interacts with Metastore. Hence, the
> only way to use Ranger policies if you use Hive via JDBC. Another option is
> HDFS or Storage ACLs, which are coarse grain control over file path etc.
> You can use Ranger to manage HDFS ACLs as well. In such scenario spark will
> be bound by those policies.
>
> On Tue, Nov 24, 2020 at 5:26 PM Dennis Suhari 
> wrote:
>
>> Hi Joyan,
>>
>> Spark uses its own metastore. Using Ranger you need to use the Hive
>> Metastore. For this you need to point to Hive Metastore and use HiveContext
>> in your Spark Code.
>>
>> Br,
>>
>> Dennis
>>
>> Von meinem iPhone gesendet
>>
>> Am 23.11.2020 um 19:04 schrieb joyan sil :
>>
>> 
>>
>> Hi,
>>
>> We have ranger policies defined on the hive table and authorization works
>> as expected when we use hive cli and beeline. But when we access those hive
>> tables using spark-shell or spark-submit it does not work.
>>
>>  Any suggestions to make Ranger work with Spark?
>>
>>
>> Regards
>>
>> Joyan
>>
>>
>
> --
> Best Regards,
> Ayan Guha
>


Failed to construct kafka consumer, Failed to load SSL keystore + Spark Streaming

2022-02-12 Thread joyan sil
Hi All,

I am trying to read from Kafka using spark streaming from spark-shell but
getting the below error. Any suggestions to fix this is much appreciated.

I am running from spark-shell hence it is client mode and the files are
available in the local filesystem.

I tried to access the files as shown below. But I still get the same error.
Any suggestions to make this work from spark-shell

spark-shell \
--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.7,org.apache.avro:avro:1.8.2,org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.8
\
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
\
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--files
/local_dir/kafka.client.truststore.jks,/local_dir/test.kafka.client.xxx.com.jks

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1,server2")
.option("subscribe", "wm-cth-salesstreams")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 100)
.option("failOnDataLoss", false)
.option("kafka.security.protocol","SSL")

*
//.option("kafka.ssl.truststore.location","/local_dir/kafka.client.truststore.jks").option("kafka.ssl.truststore.location","file://"
+
org.apache.spark.SparkFiles.get("/local_dir/kafka.client.truststore.jks"))*
.option("kafka.ssl.truststore.password","pwd")
.option("kafka.ssl.keystore.password","pwd")

*
//.option("kafka.ssl.keystore.location","/local_dir/test.kafka.client.xxx.com.jks")).load.option("kafka.ssl.keystore.location","file://"
+
org.apache.spark.SparkFiles.get("/local_dir/test.kafka.client.xxx.com.jks"))).load*

Exception:
22/02/12 15:57:03 INFO org.apache.spark.sql.kafka010.KafkaMicroBatchReader:
Initial offsets:
{"wm-cth-salesstreams":{"23":167267092,"59":167276860,"50":167258479,"32":167281169,"41":167272687,"53":167256274,"17":167269072,"8":167282513,"35":167298150,"44":167244867,"26":167242913,"11":167283073,"56":167304913,"29":167307963,"38":167287380,"47":167312027,"20":167280591,"2":167248970,"5":167308945,"14":167231970,"46":167267534,"55":167275890,"58":167287699,"49":167245856,"40":167247065,"13":167249522,"4":167301468,"22":167269011,"31":167349129,"16":167266948,"7":167272315,"52":167276042,"43":167273593,"25":167232737,"34":167264787,"10":167265137,"37":167252586,"1":167312454,"19":167247237,"28":167280632,"54":167307408,"45":167280214,"27":167249248,"36":167282370,"18":167223580,"9":167223643,"57":167340670,"21":167277793,"48":167273190,"3":167294084,"12":167299093,"30":167236443,"39":167311503,"15":167274468,"42":167292272,"51":167252733,"24":167245661,"6":167241738,"33":167224273,"0":167295530}}
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/local_dir/test.kafka.client.xxx.com.jks of type JKS
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:64)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:713)
... 51 more
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/local_dir/test.kafka.client.xxx.com.jks of type JKS
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:137)
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:62)
... 55 more
*Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
keystore /local_dir/test.kafka.client.xxx.com.jks of type JKS*
at
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:330)
at
org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:218)
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:135)
... 56 more
*Caused by: java.io.FileNotFoundException:
/local_dir/test.kafka.client.xxx.com.jks (No such file or directory)*
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at java.io.FileInputStream.(FileInputStream.java:93)
at
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:323)
... 58 more