I have defined the following sink:





































*object ParquetSink {  def parquetFileSink[A <: Message: ClassTag](
assigner: A => String,      config: Config  )(implicit lc: LoggingConfigs):
FileSink[A] = {    val bucketAssigner = new BucketAssigner[A, String] {
  override def getBucketId(element: A, context: BucketAssigner.Context):
String = {        val path = assigner(element)        logger.info
<http://logger.info>(LogMessage(-1, s"Writing file to
${config.getString(baseDirKey)}/$path", "NA"))        path      }
override def getSerializer: SimpleVersionedSerializer[String] =
SimpleVersionedStringSerializer.INSTANCE    }    def builder(outFile:
OutputFile): ParquetWriter[A] =      new
ParquetProtoWriters.ParquetProtoWriterBuilder(        outFile,
implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
).withCompressionCodec(config.getCompression(compressionKey)).build()
val parquetBuilder: ParquetBuilder[A] = path => builder(path)    FileSink
    .forBulkFormat(        new
Path(s"wasbs://${config.getString(baseDirKey)}@${config.getString(accountNameKey)}.blob.core.windows.net
<http://blob.core.windows.net>"),        new
ParquetWriterFactory[A](parquetBuilder)      )
.withBucketAssigner(bucketAssigner)      .withOutputFileConfig(
OutputFileConfig          .builder()          .withPartSuffix(".parquet")
        .build()      )      .build()  }}*

After deploying the job I get the following exception:

*Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
AzureBlob are only supported for ABFS *
*        at
org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.checkSupportedFSSchemes(AzureBlobRecoverableWriter.java:44)
~[?:?]                                                          *



*        at
org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
~[?:?]
        at
org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.<init>(AzureBlobRecoverableWriter.java:37)
~[?:?]
              at
org.apache.flink.fs.azurefs.AzureBlobFileSystem.createRecoverableWriter(AzureBlobFileSystem.java:44)
~[?:?]
           at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]*

The question is whether I can make any changes to this code to make it work
with *wasbs *protocol and not *abfs* ?

Tnx in advance, Eli.

-- 
Confidentiality Notice: This e-mail communication and any attachments may 
contain confidential and privileged information for the use of the 
designated recipient(s) above. If you are not the intended recipient(s), 
you are hereby notified that you received this communication in error and 
that any review, disclosure, dissemination, distribution or copying of it 
or its contents is prohibited. If you have received this communication in 
error, please notify me immediately by replying to this message and 
deleting it from your computer. Thank you  

Reply via email to