Re: Thrift2 Server on Kubernetes?

2021-05-14 Thread mykidong
Hi Meikel,

If you want to run Spark Thrift Server on Kubernetes, take a look at my blog
post: https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1

Cheers,

- Kidong Lee.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Hive on Spark in Kubernetes.

2020-10-07 Thread mykidong
Hi all,

I have recently written a blog about hive on spark in kubernetes
environment:
- https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1

In this blog, you can find how to run hive on kubernetes using spark thrift
server compatible with hive server2.

Cheers,

- Kidong.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: UnknownHostException is thrown when spark job whose jar files will be uploaded to s3 object storage via https is submitted to kubernetes

2020-09-26 Thread mykidong
Sorry, I have missed setting path style access of s3 property to submit.
If I have added --conf spark.hadoop.fs.s3a.path.style.access=true in the
spark submit, it works fine!

- Kidong.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



UnknownHostException is thrown when spark job whose jar files will be uploaded to s3 object storage via https is submitted to kubernetes

2020-09-19 Thread mykidong



Hi,

I have already succeeded submitting spark job to kubernetes with accessing
s3 object storage in the below env.
At that time, I was in the following env.:
Spark: 3.0.0.
S3 Object Storage: Hadoop Ozone S3 Object Storage accessed by HTTP Endpoint
containging IP Address, Not Host Name.
Resource Management: Kubernetes.
My spark job worked OK.


Now, I want to replace unsecured HTTP with HTTPS to access s3 object
storage:
Spark: 3.0.0.
S3 Object Storage: MinIO S3 Object Storage accessed by HTTPS Endpoint
containging Host Name.
Resource Management: Kubernetes.

I have already installed cert-manager, ingress controller on k8s, and added
my S3 Endpoint Host Name to public DNS server.
I have also tested MinIO S3 Object Storage using AWS CLI via HTTPS, it works
fine what I expected.

But, the problem is when I submit my spark job to kubernetes and the deps
jar files will be uploaded to my MinIO S3 Object Storage, spark submit
cannot find my S3 endpoint with the WRONG HOST NAME.

Let's see my spark job submit:

export MASTER=k8s://https://10.0.4.5:6443;
export NAMESPACE=ai-developer;
export ENDPOINT=https://mykidong-tenant.minio.cloudchef-labs.com;

spark-submit \
--master $MASTER \
--deploy-mode cluster \
--name spark-thrift-server \
--class io.spongebob.hive.SparkThriftServerRunner \
--packages
com.amazonaws:aws-java-sdk-s3:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0 \
--conf "spark.executor.extraJavaOptions=-Dnetworkaddress.cache.ttl=60" \
--conf "spark.driver.extraJavaOptions=-Dnetworkaddress.cache.ttl=60" \
--conf spark.kubernetes.file.upload.path=s3a://mykidong/spark-thrift-server
\
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.namespace=$NAMESPACE \
--conf spark.kubernetes.container.image=mykidong/spark:v3.0.0 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.hadoop.hive.metastore.client.connect.retry.delay=5 \
--conf spark.hadoop.hive.metastore.client.socket.timeout=1800 \
--conf
spark.hadoop.hive.metastore.uris=thrift://metastore.$NAMESPACE.svc.cluster.local:9083
\
--conf spark.hadoop.hive.server2.enable.doAs=false \
--conf spark.hadoop.hive.server2.thrift.http.port=10002 \
--conf spark.hadoop.hive.server2.thrift.port=10016 \
--conf spark.hadoop.hive.server2.transport.mode=binary \
--conf spark.hadoop.metastore.catalog.default=spark \
--conf spark.hadoop.hive.execution.engine=spark \
--conf spark.hadoop.hive.input.format=io.delta.hive.HiveInputFormat \
--conf spark.hadoop.hive.tez.input.format=io.delta.hive.HiveInputFormat \
--conf spark.sql.warehouse.dir=s3a://mykidong/apps/spark/warehouse \
--conf spark.hadoop.fs.defaultFS=s3a://mykidong \
--conf spark.hadoop.fs.s3a.access.key=bWluaW8= \
--conf spark.hadoop.fs.s3a.secret.key=bWluaW8xMjM= \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=true \
--conf spark.hadoop.fs.s3a.endpoint=$ENDPOINT \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp"
\
--conf spark.executor.instances=4 \
--conf spark.executor.memory=2G \
--conf spark.executor.cores=2 \
--conf spark.driver.memory=1G \
--conf
spark.jars=/home/pcp/delta-lake/connectors/dist/delta-core-shaded-assembly_2.12-0.1.0.jar,/home/pcp/delta-lake/connectors/dist/hive-delta_2.12-0.1.0.jar
\
file:///home/pcp/spongebob/examples/spark-thrift-server/target/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar;



After a little time, I got the following UnknownHost Exception:

20/09/20 03:29:23 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
20/09/20 03:29:24 INFO SparkKubernetesClientFactory: Auto-configuring K8S
client using current context from users K8S config file
20/09/20 03:29:25 INFO KerberosConfDriverFeatureStep: You have not specified
a krb5.conf file locally or via a ConfigMap. Make sure that you have the
krb5.conf locally on the driver image.
20/09/20 03:29:26 WARN MetricsConfig: Cannot locate configuration: tried
hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
20/09/20 03:29:26 INFO MetricsSystemImpl: Scheduled Metric snapshot period
at 10 second(s).
20/09/20 03:29:26 INFO MetricsSystemImpl: s3a-file-system metrics system
started



Exception in thread "main" org.apache.spark.SparkException: Uploading file
/home/pcp/delta-lake/connectors/dist/delta-core-shaded-assembly_2.12-0.1.0.jar
failed...
at
org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:289)
at
org.apache.spark.deploy.k8s.KubernetesUtils$.$anonfun$uploadAndTransformFileUris$1(KubernetesUtils.scala:248)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.mutable.Resiza

Re: spark-shell, how it works internally

2019-12-11 Thread mykidong
I have found a source how to compile spark codes and dynamically load them
into distributed executors in spark repl:
https://ardoris.wordpress.com/2014/03/30/how-spark-does-class-loading/

If you run spark repl, you can find the spark configuration like this :
"spark.repl.class.uri":"spark://xxx:41827/classes"

The repl class fetch server will be run to handle the classes compiled by
repl spark interpreter with this uri in the spark repl driver.
The distributed executors will fetch the classes from the repl class fetch
server with the uri of "spark.repl.class.uri" and load them into the
classloader in ExecutorClassLoader.

I have also researched the spark and zeppeline source codes to use only
spark interpreter, but not repl entirely.

I have picked up some codes from zeppeline and spark to run spark
interpreter in my application. 
In my application, the embeded http server  will be run to handle and
interpret the spark codes from the user, the spark codes sent by users will
be interpreted dynamically and executed on the distributed executors like
spark repl does. It works for now!!

For my application, there are some more research to do, for instance, how to
handle multiple users with the individual spark session, etc.

Cheers,

- Kidong Lee.






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark-shell, how it works internally

2019-11-27 Thread mykidong
Hi,

I have a plan to program such functions like in spark-shell.
When spark-shell is run for yarn, it seems that spark-shell application is
submitted to yarn with yarn cluster client mode.

I am curious when the input codes in scala are typed in spark-shell, how the
input codes in scala are compiled dynamically, and how these compiled codes
can be loaded onto the classloader in the distributed current yarn
executors.

Cheers,

- Kidong Lee








--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: KafkaUtils and specifying a specific partition

2015-03-12 Thread mykidong
If you want to use another kafka receiver instead of current spark kafka 
receiver, You can see this:
https://github.com/mykidong/spark-kafka-simple-consumer-receiver/blob/master/src/main/java/spark/streaming/receiver/kafka/KafkaReceiverUtils.java

You can handle to get just the stream from the specified partition.

- Kidong.


-- Original Message --
From: ColinMc [via Apache Spark User List] 
ml-node+s1001560n22018...@n3.nabble.com
To: mykidong mykid...@gmail.com
Sent: 2015-03-13 오전 1:58:08
Subject: KafkaUtils and specifying a specific partition

Hi,

How do you use KafkaUtils to specify a specific partition? I'm writing 
customer Marathon jobs where a customer is given 1 partition in a topic 
in Kafka. The job will get the partition from our database for that 
customer and use that to get the messages for that customer.

I misinterpreted KafkaUtils when creating the stream and didn't know 
that it was the number of partitions per topic in the map.

If KafkaUtils doesn't support this, is there another Spark API call for 
Kafka that supports this?


If you reply to this email, your message will be added to the 
discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018.html
To unsubscribe from Apache Spark User List, click here.
NAML



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018p22023.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Any sample code for Kafka consumer

2015-02-22 Thread mykidong
In java, you can see this example:
https://github.com/mykidong/spark-kafka-simple-consumer-receiver

- Kidong.

-- Original Message --
From: icecreamlc [via Apache Spark User List] 
ml-node+s1001560n21746...@n3.nabble.com
To: mykidong mykid...@gmail.com
Sent: 2015-02-21 오전 11:16:37
Subject: Any sample code for Kafka consumer

Dear all,

Do you have any sample code that consume data from Kafka server using 
spark streaming? Thanks a lot!!!


If you reply to this email, your message will be added to the 
discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Any-sample-code-for-Kafka-consumer-tp21746.html
To unsubscribe from Apache Spark User List, click here.
NAML



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Any-sample-code-for-Kafka-consumer-tp21746p21758.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Integrerate Spark Streaming and Kafka, but get bad symbolic reference error

2015-01-24 Thread mykidong
Maybe, you can use alternative kafka receiver which I wrote:
https://github.com/mykidong/spark-kafka-simple-consumer-receiver

- Kidong.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrerate-Spark-Streaming-and-Kafka-but-get-bad-symbolic-reference-error-tp21335p21353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to replay consuming messages from kafka using spark streaming?

2015-01-23 Thread mykidong
Hi,

I have written spark streaming kafka receiver using kafka simple consumer
api:
https://github.com/mykidong/spark-kafka-simple-consumer-receiver

This kafka receiver can be used as alternative to the current spark
streaming kafka receiver which is just written in high level kafka consumer
api.

With this kafka receiver, the kafka message offset control can be done more
easier for Receiver Woker Node Failure and Driver Node Failure.

- Kidong.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145p21343.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread mykidong
Hi Dibyendu,

I am using kafka 0.8.1.1 and spark 1.2.0.
After modifying these version of your pom, I have rebuilt your codes.
But I have not got any messages from ssc.receiverStream(new
KafkaReceiver(_props, i)).

I have found, in your codes, all the messages are retrieved correctly, but
_receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
class's method does not seem to work correctly.

Have you tried running your spark streaming kafka consumer with kafka
0.8.1.1 and spark 1.2.0 ?

- Kidong.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to replay consuming messages from kafka using spark streaming?

2015-01-14 Thread mykidong
Hi,

My Spark Streaming Job is doing like kafka etl to HDFS.
For instance, every 10 min. my streaming job is retrieving messages from
kafka, and save them as avro files onto hdfs. 
My question is, if worker fails to write avro to hdfs, sometimes, I want to
replay consuming messages from the last succeeded kafka offset again. 
I think, Spark Streaming Kafka Receiver is written using Kafka High Level
Consumer API, not Simple Consumer API.

Any idea how to replay kafka consuming in spark streaming?

- Kidong.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JdbcRDD

2014-11-18 Thread mykidong
I had also same problem to use JdbcRDD in java.
For me, I have written a class in scala to get JdbcRDD, and I call this
instance from java.

for instance, JdbcRDDWrapper.scala like this:

...

import java.sql._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import com.gsshop.polaris.domain.event._

class JdbcRDDWrapper(sc: SparkContext, rowCount: Long, from: Long, to: Long)
 {

  def getItemViewEventJdbcRdd(): JdbcRDD[ItemViewEvent] =
  {
val sql =  +
SELECT  +
i.ID as id, +
i.ITEM_ID as \itemViewEvent.itemId\, +
i.BRAND_ID as \itemViewEvent.brandId\, +
i.ITEM_TYPE as \itemViewEvent.itemType\, +
i.PROMOTION_ID as \itemViewEvent.promotionId\, +
i.PRICE as \itemViewEvent.price\, +
i.ITEM_TITLE as \itemViewEvent.itemTitle\, +
i.ITEM_DESCRIPTION as \itemViewEvent.itemDescription\, +
i.THUMB_NAIL_URL as \itemViewEvent.thumbnailUrl\, +
i.LOAD_DATE as loadDate, +
b.EVENT_TYPE as \itemViewEvent.baseProperties.eventType\, +
b.TIMESTAMP as \itemViewEvent.baseProperties.timestamp\, +
b.URL as \itemViewEvent.baseProperties.url\, +
b.REFERER as \itemViewEvent.baseProperties.referer\, +
b.UID as \itemViewEvent.baseProperties.uid\, +
b.PCID as \itemViewEvent.baseProperties.pcid\, +
b.SERVICE_ID as \itemViewEvent.baseProperties.serviceId\, +
b.VERSION as \itemViewEvent.baseProperties.version\, +
b.DEVICE_TYPE as \itemViewEvent.baseProperties.deviceType\, +
b.DOMAIN as \itemViewEvent.baseProperties.domain\, +
b.SITE as \itemViewEvent.baseProperties.site\  +
FROM ITEM_VIEW_EVENT AS i  +
INNER JOIN BASE_PROPERTIES AS b  +
ON i.ID = b.EVENT_ID  +
WHERE b.TIMESTAMP != ? AND  + from +
 = b.TIMESTAMP AND b.TIMESTAMP   + to +  LIMIT ? 

val rdd = new JdbcRDD(
  sc,
  () = {
Class.forName(org.apache.phoenix.jdbc.PhoenixDriver)
DriverManager.getConnection(jdbc:phoenix:xx:/hbase-unsecure)
  },
  sql,
  0, rowCount, 5,
  (rs: ResultSet) = {
val baseProperties = new BaseProperties()
baseProperties.setEventType(rs.getString(
itemViewEvent.baseProperties.eventType))
baseProperties.setTimestamp(rs.getLong(
itemViewEvent.baseProperties.timestamp))
baseProperties.setUrl(rs.getString(
itemViewEvent.baseProperties.url))
baseProperties.setReferer(rs.getString(
itemViewEvent.baseProperties.referer))
baseProperties.setUid(rs.getString(
itemViewEvent.baseProperties.uid))
baseProperties.setPcid(rs.getString(
itemViewEvent.baseProperties.pcid))
baseProperties.setServiceId(rs.getString(
itemViewEvent.baseProperties.serviceId))
baseProperties.setVersion(rs.getString(
itemViewEvent.baseProperties.version))
baseProperties.setDeviceType(rs.getString(
itemViewEvent.baseProperties.deviceType))
baseProperties.setDomain(rs.getString(
itemViewEvent.baseProperties.domain))
baseProperties.setSite(rs.getString(
itemViewEvent.baseProperties.site))

val itemViewEvent = new ItemViewEvent()
itemViewEvent.setItemId(rs.getString(itemViewEvent.itemId))
itemViewEvent.setBrandId(rs.getString(itemViewEvent.brandId))
itemViewEvent.setItemType(rs.getString(itemViewEvent.itemType))
itemViewEvent.setPromotionId(rs.getString(
itemViewEvent.promotionId))
itemViewEvent.setPrice(rs.getLong(itemViewEvent.price))
itemViewEvent.setItemTitle(rs.getString(itemViewEvent.itemTitle))
itemViewEvent.setItemDescription(rs.getString(
itemViewEvent.itemDescription))
itemViewEvent.setThumbnailUrl(rs.getString(
itemViewEvent.thumbnailUrl))
itemViewEvent.setBaseProperties(baseProperties)


itemViewEvent
  })

rdd
  }

}

and from java, JdbcRdd can be received:

import scala.reflect.ClassManifestFactory$;

...
JdbcRDDItemViewEvent jdbcRddItemViewEvent = new
JdbcRDDWrapper(JavaSparkContext.toSparkContext(ctx), rowCountItemViewEvent,
fromTime, toTime).getItemViewEventJdbcRdd();
JavaRDDItemViewEvent javaRddItemViewEvent =
JavaRDD.fromRDD(jdbcRddItemViewEvent,
ClassManifestFactory$.MODULE$.fromClass(ItemViewEvent.class));


- Kidong.





2014-11-19 8:58 GMT+09:00 sparkling [via Apache Spark User List] 
ml-node+s1001560n19233...@n3.nabble.com:

 Hi,

 Are there any examples of using JdbcRDD in java available?

 Its not clear what is the last argument in this example (
 https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
 ):

 sc = new SparkContext(local, test)
 val rdd = new JdbcRDD(
 sc,
 () = { DriverManager.getConnection(jdbc:derby:target/JdbcRDDSuiteDb) },
 SELECT DATA FROM FOO WHERE ? = ID AND ID = ?,
 1, 100, 3,
 (r: ResultSet) = { r.getInt(1) }
 ).cache()

 Thanks



 --
  If you reply to this email, your message will be added to the discussion
 below:
 

How to read just specified columns from parquet file using SparkSQL.

2014-09-30 Thread mykidong
Hi,

I am new to SparkSQL.

I want to read the specified columns from the parquet, not all the columns
defined in the parquet file.

For instance, the schema of the parquet file would look like this:
{
  type: record,
  name: ElectricPowerUsage,
  namespace: jcascalog.parquet.example,
  fields: [
{
  name: addressCode,
  type: [
null,
string
  ]
},
{
  name: timestamp,
  type: [
null,
long
  ]
},
{
  name: devicePowerEventList,
  type: {
type: array,
items: {
  type: record,
  name: DevicePowerEvent,
  fields: [
{
  name: power,
  type: [
null,
double
  ]
},
{
  name: deviceType,
  type: [
null,
int
  ]
},
{
  name: deviceId,
  type: [
null,
int
  ]
},
{
  name: status,
  type: [
null,
int
  ]
}
  ]
}
  }
}
  ]
}

To read just specified columns(addressCode, devicePowerEventList) from this
parquet file, the following schema defines just addressCode,
devicePowerEventList columns:
{
  type: record,
  name: ElectricPowerUsage,
  namespace: jcascalog.parquet.example,
  fields: [
{
  name: addressCode,
  type: [
null,
string
  ]
},
{
  name: devicePowerEventList,
  type: {
type: array,
items: {
  type: record,
  name: DevicePowerEvent,
  fields: [
{
  name: power,
  type: [
null,
double
  ]
}
  ]
}
  }
}
  ]
}

I have not yet found from spark docs to handle this.


- Kidong Lee.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-just-specified-columns-from-parquet-file-using-SparkSQL-tp15459.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org