Re: Thrift2 Server on Kubernetes?
Hi Meikel, If you want to run Spark Thrift Server on Kubernetes, take a look at my blog post: https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1 Cheers, - Kidong Lee. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Hive on Spark in Kubernetes.
Hi all, I have recently written a blog about hive on spark in kubernetes environment: - https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1 In this blog, you can find how to run hive on kubernetes using spark thrift server compatible with hive server2. Cheers, - Kidong. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: UnknownHostException is thrown when spark job whose jar files will be uploaded to s3 object storage via https is submitted to kubernetes
Sorry, I have missed setting path style access of s3 property to submit. If I have added --conf spark.hadoop.fs.s3a.path.style.access=true in the spark submit, it works fine! - Kidong. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
UnknownHostException is thrown when spark job whose jar files will be uploaded to s3 object storage via https is submitted to kubernetes
Hi, I have already succeeded submitting spark job to kubernetes with accessing s3 object storage in the below env. At that time, I was in the following env.: Spark: 3.0.0. S3 Object Storage: Hadoop Ozone S3 Object Storage accessed by HTTP Endpoint containging IP Address, Not Host Name. Resource Management: Kubernetes. My spark job worked OK. Now, I want to replace unsecured HTTP with HTTPS to access s3 object storage: Spark: 3.0.0. S3 Object Storage: MinIO S3 Object Storage accessed by HTTPS Endpoint containging Host Name. Resource Management: Kubernetes. I have already installed cert-manager, ingress controller on k8s, and added my S3 Endpoint Host Name to public DNS server. I have also tested MinIO S3 Object Storage using AWS CLI via HTTPS, it works fine what I expected. But, the problem is when I submit my spark job to kubernetes and the deps jar files will be uploaded to my MinIO S3 Object Storage, spark submit cannot find my S3 endpoint with the WRONG HOST NAME. Let's see my spark job submit: export MASTER=k8s://https://10.0.4.5:6443; export NAMESPACE=ai-developer; export ENDPOINT=https://mykidong-tenant.minio.cloudchef-labs.com; spark-submit \ --master $MASTER \ --deploy-mode cluster \ --name spark-thrift-server \ --class io.spongebob.hive.SparkThriftServerRunner \ --packages com.amazonaws:aws-java-sdk-s3:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0 \ --conf "spark.executor.extraJavaOptions=-Dnetworkaddress.cache.ttl=60" \ --conf "spark.driver.extraJavaOptions=-Dnetworkaddress.cache.ttl=60" \ --conf spark.kubernetes.file.upload.path=s3a://mykidong/spark-thrift-server \ --conf spark.kubernetes.container.image.pullPolicy=Always \ --conf spark.kubernetes.namespace=$NAMESPACE \ --conf spark.kubernetes.container.image=mykidong/spark:v3.0.0 \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.hadoop.hive.metastore.client.connect.retry.delay=5 \ --conf spark.hadoop.hive.metastore.client.socket.timeout=1800 \ --conf spark.hadoop.hive.metastore.uris=thrift://metastore.$NAMESPACE.svc.cluster.local:9083 \ --conf spark.hadoop.hive.server2.enable.doAs=false \ --conf spark.hadoop.hive.server2.thrift.http.port=10002 \ --conf spark.hadoop.hive.server2.thrift.port=10016 \ --conf spark.hadoop.hive.server2.transport.mode=binary \ --conf spark.hadoop.metastore.catalog.default=spark \ --conf spark.hadoop.hive.execution.engine=spark \ --conf spark.hadoop.hive.input.format=io.delta.hive.HiveInputFormat \ --conf spark.hadoop.hive.tez.input.format=io.delta.hive.HiveInputFormat \ --conf spark.sql.warehouse.dir=s3a://mykidong/apps/spark/warehouse \ --conf spark.hadoop.fs.defaultFS=s3a://mykidong \ --conf spark.hadoop.fs.s3a.access.key=bWluaW8= \ --conf spark.hadoop.fs.s3a.secret.key=bWluaW8xMjM= \ --conf spark.hadoop.fs.s3a.connection.ssl.enabled=true \ --conf spark.hadoop.fs.s3a.endpoint=$ENDPOINT \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.fast.upload=true \ --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \ --conf spark.executor.instances=4 \ --conf spark.executor.memory=2G \ --conf spark.executor.cores=2 \ --conf spark.driver.memory=1G \ --conf spark.jars=/home/pcp/delta-lake/connectors/dist/delta-core-shaded-assembly_2.12-0.1.0.jar,/home/pcp/delta-lake/connectors/dist/hive-delta_2.12-0.1.0.jar \ file:///home/pcp/spongebob/examples/spark-thrift-server/target/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar; After a little time, I got the following UnknownHost Exception: 20/09/20 03:29:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 20/09/20 03:29:24 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file 20/09/20 03:29:25 INFO KerberosConfDriverFeatureStep: You have not specified a krb5.conf file locally or via a ConfigMap. Make sure that you have the krb5.conf locally on the driver image. 20/09/20 03:29:26 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 20/09/20 03:29:26 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s). 20/09/20 03:29:26 INFO MetricsSystemImpl: s3a-file-system metrics system started Exception in thread "main" org.apache.spark.SparkException: Uploading file /home/pcp/delta-lake/connectors/dist/delta-core-shaded-assembly_2.12-0.1.0.jar failed... at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:289) at org.apache.spark.deploy.k8s.KubernetesUtils$.$anonfun$uploadAndTransformFileUris$1(KubernetesUtils.scala:248) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.Resiza
Re: spark-shell, how it works internally
I have found a source how to compile spark codes and dynamically load them into distributed executors in spark repl: https://ardoris.wordpress.com/2014/03/30/how-spark-does-class-loading/ If you run spark repl, you can find the spark configuration like this : "spark.repl.class.uri":"spark://xxx:41827/classes" The repl class fetch server will be run to handle the classes compiled by repl spark interpreter with this uri in the spark repl driver. The distributed executors will fetch the classes from the repl class fetch server with the uri of "spark.repl.class.uri" and load them into the classloader in ExecutorClassLoader. I have also researched the spark and zeppeline source codes to use only spark interpreter, but not repl entirely. I have picked up some codes from zeppeline and spark to run spark interpreter in my application. In my application, the embeded http server will be run to handle and interpret the spark codes from the user, the spark codes sent by users will be interpreted dynamically and executed on the distributed executors like spark repl does. It works for now!! For my application, there are some more research to do, for instance, how to handle multiple users with the individual spark session, etc. Cheers, - Kidong Lee. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark-shell, how it works internally
Hi, I have a plan to program such functions like in spark-shell. When spark-shell is run for yarn, it seems that spark-shell application is submitted to yarn with yarn cluster client mode. I am curious when the input codes in scala are typed in spark-shell, how the input codes in scala are compiled dynamically, and how these compiled codes can be loaded onto the classloader in the distributed current yarn executors. Cheers, - Kidong Lee -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: KafkaUtils and specifying a specific partition
If you want to use another kafka receiver instead of current spark kafka receiver, You can see this: https://github.com/mykidong/spark-kafka-simple-consumer-receiver/blob/master/src/main/java/spark/streaming/receiver/kafka/KafkaReceiverUtils.java You can handle to get just the stream from the specified partition. - Kidong. -- Original Message -- From: ColinMc [via Apache Spark User List] ml-node+s1001560n22018...@n3.nabble.com To: mykidong mykid...@gmail.com Sent: 2015-03-13 오전 1:58:08 Subject: KafkaUtils and specifying a specific partition Hi, How do you use KafkaUtils to specify a specific partition? I'm writing customer Marathon jobs where a customer is given 1 partition in a topic in Kafka. The job will get the partition from our database for that customer and use that to get the messages for that customer. I misinterpreted KafkaUtils when creating the stream and didn't know that it was the number of partitions per topic in the map. If KafkaUtils doesn't support this, is there another Spark API call for Kafka that supports this? If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018.html To unsubscribe from Apache Spark User List, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018p22023.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Any sample code for Kafka consumer
In java, you can see this example: https://github.com/mykidong/spark-kafka-simple-consumer-receiver - Kidong. -- Original Message -- From: icecreamlc [via Apache Spark User List] ml-node+s1001560n21746...@n3.nabble.com To: mykidong mykid...@gmail.com Sent: 2015-02-21 오전 11:16:37 Subject: Any sample code for Kafka consumer Dear all, Do you have any sample code that consume data from Kafka server using spark streaming? Thanks a lot!!! If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Any-sample-code-for-Kafka-consumer-tp21746.html To unsubscribe from Apache Spark User List, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-sample-code-for-Kafka-consumer-tp21746p21758.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Integrerate Spark Streaming and Kafka, but get bad symbolic reference error
Maybe, you can use alternative kafka receiver which I wrote: https://github.com/mykidong/spark-kafka-simple-consumer-receiver - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Integrerate-Spark-Streaming-and-Kafka-but-get-bad-symbolic-reference-error-tp21335p21353.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to replay consuming messages from kafka using spark streaming?
Hi, I have written spark streaming kafka receiver using kafka simple consumer api: https://github.com/mykidong/spark-kafka-simple-consumer-receiver This kafka receiver can be used as alternative to the current spark streaming kafka receiver which is just written in high level kafka consumer api. With this kafka receiver, the kafka message offset control can be done more easier for Receiver Woker Node Failure and Driver Node Failure. - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145p21343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, I am using kafka 0.8.1.1 and spark 1.2.0. After modifying these version of your pom, I have rebuilt your codes. But I have not got any messages from ssc.receiverStream(new KafkaReceiver(_props, i)). I have found, in your codes, all the messages are retrieved correctly, but _receiver.store(_dataBuffer.iterator()) which is spark streaming abstract class's method does not seem to work correctly. Have you tried running your spark streaming kafka consumer with kafka 0.8.1.1 and spark 1.2.0 ? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to replay consuming messages from kafka using spark streaming?
Hi, My Spark Streaming Job is doing like kafka etl to HDFS. For instance, every 10 min. my streaming job is retrieving messages from kafka, and save them as avro files onto hdfs. My question is, if worker fails to write avro to hdfs, sometimes, I want to replay consuming messages from the last succeeded kafka offset again. I think, Spark Streaming Kafka Receiver is written using Kafka High Level Consumer API, not Simple Consumer API. Any idea how to replay kafka consuming in spark streaming? - Kidong. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JdbcRDD
I had also same problem to use JdbcRDD in java. For me, I have written a class in scala to get JdbcRDD, and I call this instance from java. for instance, JdbcRDDWrapper.scala like this: ... import java.sql._ import org.apache.spark.SparkContext import org.apache.spark.rdd.JdbcRDD import com.gsshop.polaris.domain.event._ class JdbcRDDWrapper(sc: SparkContext, rowCount: Long, from: Long, to: Long) { def getItemViewEventJdbcRdd(): JdbcRDD[ItemViewEvent] = { val sql = + SELECT + i.ID as id, + i.ITEM_ID as \itemViewEvent.itemId\, + i.BRAND_ID as \itemViewEvent.brandId\, + i.ITEM_TYPE as \itemViewEvent.itemType\, + i.PROMOTION_ID as \itemViewEvent.promotionId\, + i.PRICE as \itemViewEvent.price\, + i.ITEM_TITLE as \itemViewEvent.itemTitle\, + i.ITEM_DESCRIPTION as \itemViewEvent.itemDescription\, + i.THUMB_NAIL_URL as \itemViewEvent.thumbnailUrl\, + i.LOAD_DATE as loadDate, + b.EVENT_TYPE as \itemViewEvent.baseProperties.eventType\, + b.TIMESTAMP as \itemViewEvent.baseProperties.timestamp\, + b.URL as \itemViewEvent.baseProperties.url\, + b.REFERER as \itemViewEvent.baseProperties.referer\, + b.UID as \itemViewEvent.baseProperties.uid\, + b.PCID as \itemViewEvent.baseProperties.pcid\, + b.SERVICE_ID as \itemViewEvent.baseProperties.serviceId\, + b.VERSION as \itemViewEvent.baseProperties.version\, + b.DEVICE_TYPE as \itemViewEvent.baseProperties.deviceType\, + b.DOMAIN as \itemViewEvent.baseProperties.domain\, + b.SITE as \itemViewEvent.baseProperties.site\ + FROM ITEM_VIEW_EVENT AS i + INNER JOIN BASE_PROPERTIES AS b + ON i.ID = b.EVENT_ID + WHERE b.TIMESTAMP != ? AND + from + = b.TIMESTAMP AND b.TIMESTAMP + to + LIMIT ? val rdd = new JdbcRDD( sc, () = { Class.forName(org.apache.phoenix.jdbc.PhoenixDriver) DriverManager.getConnection(jdbc:phoenix:xx:/hbase-unsecure) }, sql, 0, rowCount, 5, (rs: ResultSet) = { val baseProperties = new BaseProperties() baseProperties.setEventType(rs.getString( itemViewEvent.baseProperties.eventType)) baseProperties.setTimestamp(rs.getLong( itemViewEvent.baseProperties.timestamp)) baseProperties.setUrl(rs.getString( itemViewEvent.baseProperties.url)) baseProperties.setReferer(rs.getString( itemViewEvent.baseProperties.referer)) baseProperties.setUid(rs.getString( itemViewEvent.baseProperties.uid)) baseProperties.setPcid(rs.getString( itemViewEvent.baseProperties.pcid)) baseProperties.setServiceId(rs.getString( itemViewEvent.baseProperties.serviceId)) baseProperties.setVersion(rs.getString( itemViewEvent.baseProperties.version)) baseProperties.setDeviceType(rs.getString( itemViewEvent.baseProperties.deviceType)) baseProperties.setDomain(rs.getString( itemViewEvent.baseProperties.domain)) baseProperties.setSite(rs.getString( itemViewEvent.baseProperties.site)) val itemViewEvent = new ItemViewEvent() itemViewEvent.setItemId(rs.getString(itemViewEvent.itemId)) itemViewEvent.setBrandId(rs.getString(itemViewEvent.brandId)) itemViewEvent.setItemType(rs.getString(itemViewEvent.itemType)) itemViewEvent.setPromotionId(rs.getString( itemViewEvent.promotionId)) itemViewEvent.setPrice(rs.getLong(itemViewEvent.price)) itemViewEvent.setItemTitle(rs.getString(itemViewEvent.itemTitle)) itemViewEvent.setItemDescription(rs.getString( itemViewEvent.itemDescription)) itemViewEvent.setThumbnailUrl(rs.getString( itemViewEvent.thumbnailUrl)) itemViewEvent.setBaseProperties(baseProperties) itemViewEvent }) rdd } } and from java, JdbcRdd can be received: import scala.reflect.ClassManifestFactory$; ... JdbcRDDItemViewEvent jdbcRddItemViewEvent = new JdbcRDDWrapper(JavaSparkContext.toSparkContext(ctx), rowCountItemViewEvent, fromTime, toTime).getItemViewEventJdbcRdd(); JavaRDDItemViewEvent javaRddItemViewEvent = JavaRDD.fromRDD(jdbcRddItemViewEvent, ClassManifestFactory$.MODULE$.fromClass(ItemViewEvent.class)); - Kidong. 2014-11-19 8:58 GMT+09:00 sparkling [via Apache Spark User List] ml-node+s1001560n19233...@n3.nabble.com: Hi, Are there any examples of using JdbcRDD in java available? Its not clear what is the last argument in this example ( https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala ): sc = new SparkContext(local, test) val rdd = new JdbcRDD( sc, () = { DriverManager.getConnection(jdbc:derby:target/JdbcRDDSuiteDb) }, SELECT DATA FROM FOO WHERE ? = ID AND ID = ?, 1, 100, 3, (r: ResultSet) = { r.getInt(1) } ).cache() Thanks -- If you reply to this email, your message will be added to the discussion below:
How to read just specified columns from parquet file using SparkSQL.
Hi, I am new to SparkSQL. I want to read the specified columns from the parquet, not all the columns defined in the parquet file. For instance, the schema of the parquet file would look like this: { type: record, name: ElectricPowerUsage, namespace: jcascalog.parquet.example, fields: [ { name: addressCode, type: [ null, string ] }, { name: timestamp, type: [ null, long ] }, { name: devicePowerEventList, type: { type: array, items: { type: record, name: DevicePowerEvent, fields: [ { name: power, type: [ null, double ] }, { name: deviceType, type: [ null, int ] }, { name: deviceId, type: [ null, int ] }, { name: status, type: [ null, int ] } ] } } } ] } To read just specified columns(addressCode, devicePowerEventList) from this parquet file, the following schema defines just addressCode, devicePowerEventList columns: { type: record, name: ElectricPowerUsage, namespace: jcascalog.parquet.example, fields: [ { name: addressCode, type: [ null, string ] }, { name: devicePowerEventList, type: { type: array, items: { type: record, name: DevicePowerEvent, fields: [ { name: power, type: [ null, double ] } ] } } } ] } I have not yet found from spark docs to handle this. - Kidong Lee. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-just-specified-columns-from-parquet-file-using-SparkSQL-tp15459.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org