I have defined the following sink:
*object ParquetSink { def parquetFileSink[A <: Message: ClassTag]( assigner: A => String, config: Config )(implicit lc: LoggingConfigs): FileSink[A] = { val bucketAssigner = new BucketAssigner[A, String] { override def getBucketId(element: A, context: BucketAssigner.Context): String = { val path = assigner(element) logger.info <http://logger.info>(LogMessage(-1, s"Writing file to ${config.getString(baseDirKey)}/$path", "NA")) path } override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE } def builder(outFile: OutputFile): ParquetWriter[A] = new ParquetProtoWriters.ParquetProtoWriterBuilder( outFile, implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]] ).withCompressionCodec(config.getCompression(compressionKey)).build() val parquetBuilder: ParquetBuilder[A] = path => builder(path) FileSink .forBulkFormat( new Path(s"wasbs://${config.getString(baseDirKey)}@${config.getString(accountNameKey)}.blob.core.windows.net <http://blob.core.windows.net>"), new ParquetWriterFactory[A](parquetBuilder) ) .withBucketAssigner(bucketAssigner) .withOutputFileConfig( OutputFileConfig .builder() .withPartSuffix(".parquet") .build() ) .build() }}* After deploying the job I get the following exception: *Caused by: java.lang.UnsupportedOperationException: Recoverable writers on AzureBlob are only supported for ABFS * * at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.checkSupportedFSSchemes(AzureBlobRecoverableWriter.java:44) ~[?:?] * * at org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) ~[?:?] at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.<init>(AzureBlobRecoverableWriter.java:37) ~[?:?] at org.apache.flink.fs.azurefs.AzureBlobFileSystem.createRecoverableWriter(AzureBlobFileSystem.java:44) ~[?:?] at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]* The question is whether I can make any changes to this code to make it work with *wasbs *protocol and not *abfs* ? Tnx in advance, Eli. -- Confidentiality Notice: This e-mail communication and any attachments may contain confidential and privileged information for the use of the designated recipient(s) above. If you are not the intended recipient(s), you are hereby notified that you received this communication in error and that any review, disclosure, dissemination, distribution or copying of it or its contents is prohibited. If you have received this communication in error, please notify me immediately by replying to this message and deleting it from your computer. Thank you