I did not find this to be true. Here is my code snippet.

object DruidStreamJob extends Job with SinkFn {

  private[flink] val druidConfig = DruidConfig.current

  private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary

  //  //TODO: Add this to sbt jvm, this should be set in sbt fork jvm.
This is hack.
  //  System.setProperty("java.library.path",
"/Users/vsiravar/workspace/aipcryptoclient/lib")
  //
  //  import java.lang.reflect.Field
  //
  //  val fieldSysPath: Field =
classOf[ClassLoader].getDeclaredField("sys_paths")
  //  fieldSysPath.setAccessible(true)
  //  fieldSysPath.set(null, null)
  //
  //  print(System.getProperty("java.library.path"))

  private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient(
    ExecutionEnv.mockEncryption,
    ExecutionEnv.enableEncryption,
    ExecutionEnv
      .loadEncryptionSet)

  aipSimpleAPIEncryptor.init("aip_crypto_config.properties")

  val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass)

  val errorLogger: Logger = LoggerFactory.getLogger("streaming.error")

*  private[flink] val sdsClient = SDSEncryptor(decryptMap,
ExecutionEnv.mockDecryption)*

  sdsClient.init()

  /**
   * Start streaming job execution .
   *
   * @param argMap
   */
  private[flink] def runJob(argMap: Map[String, String]): Unit = {

    val env = ExecutionEnv.executionEnv(argMap)
    this.source = ExecutionEnv.sourceTopics

    env.enableCheckpointing(1000)
    env.setStateBackend(new FsStateBackend("s3://vishwas.test1/checkpoints"))
    sourceAndSinkFn(env, source)
    env.execute(jobName = name)
  }

  /**
   * @inheritdoc
   * @param env
   * @param topics
   */
  override private[flink] def sourceAndSinkFn(
    env: StreamExecutionEnvironment,
    topics: List[String]) = {
    val dataStream = addSource(env)
    log.info("Subscribed to topics" + topics)

    val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] {

      override def filter(value: GenericRecord): Boolean = {
        
ExecutionEnv.messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString)
& ExecutionEnv
          .pcrList.contains(value.get("CMLS_DEST_PCR").toString)
      }
    })

    val result = filteredStream.map(record =>
encryptWithAipCryptoClient(addTimeStamp(sdsClient
      .decrypt(applyValues(record)))))

    result.print()
    KafkaSink(result).sendToKafka
  }

  private[flink] def encryptWithAipCryptoClient(maptoEncrypt:
mutable.Map[String, Any]): mutable.Map[String, Any] = {
    aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String,
AnyRef]].asJava)
    maptoEncrypt
  }

  private[flink] def applyValues(
    genericRecord: GenericRecord): mutable.Map[String, Any] = {

    collection.mutable.Map(genericRecord.getSchema.getFields.asScala
      .map(field =>
        field.schema().getType match {
          case Schema.Type.LONG =>
            field.name() -> genericRecord.get(field.name()).asInstanceOf[Long]
          case Schema.Type.INT =>
            field.name() -> genericRecord.get(field.name()).asInstanceOf[Int]
          case Schema.Type.DOUBLE =>
            field.name() -> genericRecord.get(field.name()).asInstanceOf[Double]
          case Schema.Type.STRING =>
            field.name() -> genericRecord.get(field.name()).toString
          case _ =>
            field.name() -> genericRecord.get(field.name()).toString
        }): _*)

  }

  private[flink] def addTimeStamp(payload: mutable.Map[String, Any]):
mutable.Map[String, Any] = {
    try {
      if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) {
        return payload + ("timestamp" ->
TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String],
payload("CMLS_AUTH_TIME").asInstanceOf[Int]));
      }
      return payload + ("timestamp" -> System.currentTimeMillis())
    } catch {
      case e: Exception => {
        errorLogger.error("Unable to obtain epoch time, using
currentSystem time" + e.printStackTrace())
        return payload + ("timestamp" -> System.currentTimeMillis())
      }
    }
  }

}


The code for initialization of sds client(font is green for that piece of
code) is in the main thread, even before the job graph is created. However
when I run this code on a cluster with 3 task managers on different nodes,
it is initialized each time on all the 3 nodes(taskmanager). I wonder why
this happens.

Thanks,
Vishwas

On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson <snel...@sourceallies.com>
wrote:

> @transient or use a static factory.
>
> In Scala we use a @transient lazy val with an initializer to do this
>
> Sent from my iPhone
>
> On Aug 15, 2019, at 11:40 AM, Vishwas Siravara <vsirav...@gmail.com>
> wrote:
>
> Thanks Steven. Is there a way where in I can create a singleton instance
> in each task manager instead of serializing this object ?
>
> Thanks,
> Vishwas
>
> On Thu, Aug 15, 2019 at 11:28 AM Steven Nelson <snel...@sourceallies.com>
> wrote:
>
>> The encryptor will be serialized and sent with the rest of your Job Graph
>> when the job is submitted. If it’s not serializable you get an error.
>>
>> Sent from my iPhone
>>
>> > On Aug 15, 2019, at 11:00 AM, Vishwas Siravara <vsirav...@gmail.com>
>> wrote:
>> >
>> > Hi guys,
>> > I have a map job where I want to encrypt certain keys . I initialize
>> the encryptor in the main method and apply it in the map function. How is
>> this encryptor shared when I have my job running on multiple task managers
>> with parallelism > 1 ?
>> >
>> > Thanks,
>> > Vishwas
>>
>

Reply via email to