[ 
https://issues.apache.org/jira/browse/BAHIR-116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15994559#comment-15994559
 ] 

ASF GitHub Bot commented on BAHIR-116:
--------------------------------------

Github user amarouni commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/42#discussion_r114494439
  
    --- Diff: 
streaming-pubsub/src/main/scala/org/apache/spark/streaming/pubsub/PubsubInputDStream.scala
 ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.pubsub
    +
    +import java.io.{Externalizable, ObjectInput, ObjectOutput}
    +
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    +
    +import com.google.api.client.auth.oauth2.Credential
    +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
    +import com.google.api.client.googleapis.json.GoogleJsonResponseException
    +import com.google.api.client.json.jackson2.JacksonFactory
    +import com.google.api.services.pubsub.Pubsub.Builder
    +import com.google.api.services.pubsub.model.{AcknowledgeRequest, 
PubsubMessage, PullRequest}
    +import com.google.cloud.hadoop.util.RetryHttpInitializer
    +
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.StreamingContext
    +import org.apache.spark.streaming.dstream.ReceiverInputDStream
    +import org.apache.spark.streaming.receiver.Receiver
    +import org.apache.spark.util.Utils
    +
    +/**
    + * Input stream that subscribe messages from Google cloud Pub/Sub 
subscription.
    + * @param project Google cloud project id
    + * @param subscription Pub/Sub subscription name
    + * @param credential Google cloud project credential to access Pub/Sub 
service
    + */
    +private[streaming]
    +class PubsubInputDStream(
    +    _ssc: StreamingContext,
    +    val project: String,
    +    val subscription: String,
    +    val credential: SparkGCPCredentials,
    +    val _storageLevel: StorageLevel
    +) extends ReceiverInputDStream[SparkPubsubMessage](_ssc) {
    +
    +  override def getReceiver(): Receiver[SparkPubsubMessage] = {
    +    new PubsubReceiver(project, subscription, credential, _storageLevel)
    +  }
    +}
    +
    +/**
    + * A wrapper class for PubsubMessage's with a custom serialization format.
    + *
    + * This is necessary because PubsubMessage uses inner data structures
    + * which are not serializable.
    + */
    +class SparkPubsubMessage() extends Externalizable {
    +
    +  private[pubsub] var message = new PubsubMessage
    +
    +  def getData(): Array[Byte] = message.decodeData()
    +
    +  def getAttributes(): java.util.Map[String, String] = 
message.getAttributes
    +
    +  def getMessageId(): String = message.getMessageId
    +
    +  def getPublishTime(): String = message.getPublishTime
    +
    +  override def writeExternal(out: ObjectOutput): Unit = 
Utils.tryOrIOException {
    +    message.decodeData() match {
    +      case null => out.writeInt(-1)
    +      case data =>
    +        out.writeInt(data.size)
    +        out.write(data)
    +    }
    +
    +    message.getMessageId match {
    +      case null => out.writeInt(-1)
    +      case id =>
    +        val idBuff = Utils.serialize(id)
    +        out.writeInt(idBuff.length)
    +        out.write(idBuff)
    +    }
    +
    +    message.getPublishTime match {
    +      case null => out.writeInt(-1)
    +      case time =>
    +        val publishTimeBuff = Utils.serialize(time)
    +        out.writeInt(publishTimeBuff.length)
    +        out.write(publishTimeBuff)
    +    }
    +
    +    message.getAttributes match {
    +      case null => out.writeInt(-1)
    +      case attrs =>
    +        out.writeInt(attrs.size())
    +        for ((k, v) <- message.getAttributes.asScala) {
    +          val keyBuff = Utils.serialize(k)
    +          out.writeInt(keyBuff.length)
    +          out.write(keyBuff)
    +          val valBuff = Utils.serialize(v)
    +          out.writeInt(valBuff.length)
    +          out.write(valBuff)
    +        }
    +    }
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = 
Utils.tryOrIOException {
    +    in.readInt() match {
    +      case -1 => message.encodeData(null)
    +      case bodyLength =>
    +        val data = new Array[Byte](bodyLength)
    +        in.readFully(data)
    +        message.encodeData(data)
    +    }
    +
    +    in.readInt() match {
    +      case -1 => message.setMessageId(null)
    +      case idLength =>
    +        val idBuff = new Array[Byte](idLength)
    +        in.readFully(idBuff)
    +        val id: String = Utils.deserialize(idBuff)
    +        message.setMessageId(id)
    +    }
    +
    +    in.readInt() match {
    +      case -1 => message.setPublishTime(null)
    +      case publishTimeLength =>
    +        val publishTimeBuff = new Array[Byte](publishTimeLength)
    +        in.readFully(publishTimeBuff)
    +        val publishTime: String = Utils.deserialize(publishTimeBuff)
    +        message.setPublishTime(publishTime)
    +    }
    +
    +    in.readInt() match {
    +      case -1 => message.setAttributes(null)
    +      case numAttributes =>
    +        val attributes = new java.util.HashMap[String, String]
    +        for (i <- 0 until numAttributes) {
    +          val keyLength = in.readInt()
    +          val keyBuff = new Array[Byte](keyLength)
    +          in.readFully(keyBuff)
    +          val key: String = Utils.deserialize(keyBuff)
    +
    +          val valLength = in.readInt()
    +          val valBuff = new Array[Byte](valLength)
    +          in.readFully(valBuff)
    +          val value: String = Utils.deserialize(valBuff)
    +
    +          attributes.put(key, value)
    +        }
    +        message.setAttributes(attributes)
    +    }
    +  }
    +}
    +
    +private [pubsub]
    +object ConnectionUtils {
    +  val transport = GoogleNetHttpTransport.newTrustedTransport();
    +  val jacksonFactory = JacksonFactory.getDefaultInstance;
    +
    +  /**
    +   * Client can retry with these response status
    +   */
    +  val RESOURCE_EXHAUSTED = 429
    +
    +  val CANCELLED = 499
    +
    +  val INTERNAL = 500
    +
    +  val UNAVAILABLE = 503
    +
    +  val DEADLINE_EXCEEDED = 504
    +
    +  def retryable(status: Int): Boolean = {
    +    status match {
    +      case RESOURCE_EXHAUSTED | CANCELLED | INTERNAL | UNAVAILABLE | 
DEADLINE_EXCEEDED => true
    +      case _ => false
    +    }
    +  }
    +}
    +
    +
    +private[pubsub]
    +class PubsubReceiver(
    +    project: String,
    +    subscription: String,
    +    credential: SparkGCPCredentials,
    +    storageLevel: StorageLevel)
    +    extends Receiver[SparkPubsubMessage](storageLevel) {
    +
    +  val APP_NAME = "sparkstreaming-pubsub-receiver"
    +
    +  val INIT_BACKOFF = 100 // 100ms
    --- End diff --
    
    @bchen-talend Can this value (and the other values beneath it) be supplied 
by the user ?


> Add Spark streaming connector for Google cloud Pub/Sub
> ------------------------------------------------------
>
>                 Key: BAHIR-116
>                 URL: https://issues.apache.org/jira/browse/BAHIR-116
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark Streaming Connectors
>            Reporter: Bin Chen
>
> A spark streaming connector for [Google 
> Pub/Sub|https://cloud.google.com/pubsub/] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to