Out of memory in heap memory when working with state

2022-09-05 Thread lan tran
Hi team,Currently, I was facing the OutOfMemoryError: Java heap space. This was some how due to the fact that I was storing the state on FileSystem. With the FsStateBackend, the working state for each task manager is in memory (on the JVM heap), and state backups (checkpoints) go to a distributed file system, e.g., HDFS. Therefore, is there anyways  that I can free the state in memory and directly use the state on s3 ? Sent from Mail for Windows 


The methodlogy behind the join in Table API and Datastream

2022-06-29 Thread lan tran
Hi team,I have the question about the methodology behind the joining using SQL-Client and DataStream. I have some scenario like this: I have two tables: t1 and t2 and I consume the WAL log from it and send to Kafka. Next, I will join two tables above together and convert this table in changelog stream. Therefore, if one of the tables is updated, there will be the messages. This is how it works if I use the SQL-Client to join two tables together. However, according to the doc since DataStream runs behind the background of Table API, I wonder what it will be looked like if I use DataStream instead of Table API. In Datastream API, I currently using connect to join two stream. And convert t2 into broadcast Stream and t1 as the main stream. When I update the t1 -> there is the output of the updated record but when I update t2, there is no update for the broadcast state (even though it update in state). Therefore, is there any way I can both receive the message of both update ? Do I have to save state for the t1 (main stream) or I have to change the way I joined ?Best,Quynh  Sent from Mail for Windows import org.apache.flink.connector.file.src.FileSourceSplit
import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
import org.apache.flink.table.types.logical.RowType
import models.EnrichElementMapperFunction.MemberDescriptor
import models.{AccidentClaim, EnrichElement, EnrichElementMapperFunction, 
Members}
import org.apache.flink.api.common.serialization.{BulkWriter, Encoder, 
SimpleStringEncoder}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.file.sink.FileSink.RowFormatBuilder
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog
import org.apache.flink.core.fs
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.formats.parquet.{ParquetBuilder, ParquetBulkWriter, 
ParquetColumnarRowInputFormat, ParquetFileFormatFactory, ParquetWriterFactory}
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction, 
CoProcessFunction}
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder
import 
org.apache.flink.streaming.api.functions.sink.filesystem.{OutputFileConfig, 
StreamingFileSink}
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.{Row, RowKind}
import org.apache.flink.util.Collector
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder
import org.apache.flink.table.types.logical.RowType
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetWriter}
import org.apache.parquet.io.OutputFile
import org.apache.flink.formats.parquet.ParquetBulkWriter


object WindowWordCount2 {

  def register_table_source(st_env:StreamTableEnvironment) {

Class.forName("org.postgresql.Driver")

val name = "postgres"
val default_database = "postgres"
val username = "postgres"
val password = "postgres"
val base_url = "jdbc:postgresql://postgres:5432/"

val catalog = new JdbcCatalog(name, default_database, username, password, 
base_url)

st_env.registerCatalog("postgresql", catalog)

  }

  def toInt(s: String): Option[Int] = {
try {
  Some(s.toInt)
} catch {
  case e: Exception => None
}
  }

  def main(args: Array[String]) {

implicit val typeInfo = TypeInformation.of(classOf[(String)])

val env = StreamExecutionEnvironment.getExecutionEnvironment

//final BulkWriter.Factory> writer,

env.setStateBackend(new HashMapStateBackend)

env.getCheckpointConfig.setCheckpointStorage(new 
FileSystemCheckpointStorage("s3://quynh-demo-flink/check_points/"))

env.enableCheckpointing(1000)

env.setParallelism(1)

val st_env = StreamTableEnvironment.create(env, 
EnvironmentSettings.newInstance()
  .inStreamingMode()
  .build())

val configuration = st_env.getConfig()

configuration.setNullCheck(false)

println("Creating catalog")
register_table_source(st_env)

st_env.executeSql("""
 CREATE TABLE accident_claims
WITH (
'connector'='kafka',
'topic'='pg_claims.claims.accident_claims',

Join two data stream using DataStream API using PyFlink

2022-05-13 Thread lan tran
Hi team, I have the use case is that I want to join two datastream that have the same id. If we convert into sql we will have something like thisSELECT suppliers.supplier_id, suppliers.supplier_name, orders.order_dateFROM suppliers INNER JOIN ordersON suppliers.supplier_id = orders.supplier_id;However, I don’t see the function `joins` available in PyFlink, therefore, if there is some guidance here, it would be help. Thanks team.Note: I know that using TableAPI would be easier. However, I just want to know how it can be implemented using Datastream in PyFlink.Best,Quynh  Sent from Mail for Windows 


RE: AvroRowDeserializationSchema

2022-04-28 Thread lan tran
Don’t expect that answer =))However, I am very appreciate everything you did Thanks again for helping me out.Best,Quynh. Sent from Mail for Windows From: Dian FuSent: Thursday, April 28, 2022 2:59 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Yes, I think so~ On Thu, Apr 28, 2022 at 11:00 AM lan tran <indigoblue7...@gmail.com> wrote:Hi Dian,Sorry for missing your mail, so if I did as your suggestion and the Flink somehow crashed and we have to restart the service, does the Flink job know the offset where does it read from Kafka ? Sent from Mail for Windows From: Dian FuSent: Tuesday, April 26, 2022 7:54 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,The same code in my last reply showed how to set the UID for the source operator generated using Table API. I meant that you could firstly create a source using Table API, then convert it to a DataStream API and set uid for the source operator using the same code above, then perform operations with DataStream API.Regards,Dian On Mon, Apr 25, 2022 at 9:27 PM lan tran <indigoblue7...@gmail.com> wrote:Hi Dian, Thank again for fast response.As your suggestion above, we can apply to set the UID for only for the DataStream state (as you suggest to convert from table to data stream). However, at the first phase which is collecting the data from Kafka ( having Debezium format), the UID cannot be set since we are using Table API (auto generate the UID). Therefore, if there is some crashed or needed revert using SavePoint, we cannot use it in the first phase since we cannot set the UID for this => so how can we revert it ?. As a result of that, we want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use the Savepoint for the whole full flow.Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 7:46 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,You could try the following code (also it may be a little hacky):```def set_uid_for_source(ds: DataStream, uid: str):transformation = ds._j_data_stream.getTransformation() source_transformation = transformationwhile not source_transformation.getInputs().isEmpty():source_transformation = source_transformation.getInputs().get(0) source_transformation.setUid(uid)```Besides, could you describe your use case a bit and also how you want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for the sources with these formats, it will send UPDATE messages to downstream operators. RegardsDian On Mon, Apr 25, 2022 at 12:31 PM lan tran <indigoblue7...@gmail.com> wrote:Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API), the uid is generated automatically which means we cannot revert if the system is crashed. Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 11:04 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema are still not supported in Python DataStream API. Just take a further look at the Java implementation of DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the results type is RowData instead of Row and so it should be not that easy to be directly supported in Python DataStream API. However, it supports conversion between Table API & DataStream API[1]. Could you firstly create a Table which consumes data from kafka and then convert it to a DataStream API?Regards,Dian[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <dian0511...@gmail.com> wrote:Yes, we should support them. For now, if you want to use them, you could create ones in your own project. You could refer to AvroRowDeserializationSchema[1] as an example. It should not be complicated as it's simply a wrapper of the Java implementation.Regards,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com> wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.Best,QuynhSent from Mail for Windows From: Dian FuSent: Friday, April 22, 2022 9:36 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,

RE: AvroRowDeserializationSchema

2022-04-27 Thread lan tran
Hi Dian,Sorry for missing your mail, so if I did as your suggestion and the Flink somehow crashed and we have to restart the service, does the Flink job know the offset where does it read from Kafka ? Sent from Mail for Windows From: Dian FuSent: Tuesday, April 26, 2022 7:54 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,The same code in my last reply showed how to set the UID for the source operator generated using Table API. I meant that you could firstly create a source using Table API, then convert it to a DataStream API and set uid for the source operator using the same code above, then perform operations with DataStream API.Regards,Dian On Mon, Apr 25, 2022 at 9:27 PM lan tran <indigoblue7...@gmail.com> wrote:Hi Dian, Thank again for fast response.As your suggestion above, we can apply to set the UID for only for the DataStream state (as you suggest to convert from table to data stream). However, at the first phase which is collecting the data from Kafka ( having Debezium format), the UID cannot be set since we are using Table API (auto generate the UID). Therefore, if there is some crashed or needed revert using SavePoint, we cannot use it in the first phase since we cannot set the UID for this => so how can we revert it ?. As a result of that, we want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use the Savepoint for the whole full flow.Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 7:46 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,You could try the following code (also it may be a little hacky):```def set_uid_for_source(ds: DataStream, uid: str):transformation = ds._j_data_stream.getTransformation() source_transformation = transformationwhile not source_transformation.getInputs().isEmpty():source_transformation = source_transformation.getInputs().get(0) source_transformation.setUid(uid)```Besides, could you describe your use case a bit and also how you want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for the sources with these formats, it will send UPDATE messages to downstream operators. RegardsDian On Mon, Apr 25, 2022 at 12:31 PM lan tran <indigoblue7...@gmail.com> wrote:Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API), the uid is generated automatically which means we cannot revert if the system is crashed. Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 11:04 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema are still not supported in Python DataStream API. Just take a further look at the Java implementation of DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the results type is RowData instead of Row and so it should be not that easy to be directly supported in Python DataStream API. However, it supports conversion between Table API & DataStream API[1]. Could you firstly create a Table which consumes data from kafka and then convert it to a DataStream API?Regards,Dian[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <dian0511...@gmail.com> wrote:Yes, we should support them. For now, if you want to use them, you could create ones in your own project. You could refer to AvroRowDeserializationSchema[1] as an example. It should not be complicated as it's simply a wrapper of the Java implementation.Regards,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com> wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.Best,QuynhSent from Mail for Windows From: Dian FuSent: Friday, April 22, 2022 9:36 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,I have added an example on how to use AvroRowDeserializationSchema in Python DataStream API in [1]. Please take a look at if that helps for you~Regards,Dian[1] https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <dian0511...@gmail.com> wrote:Hi Quynh,Could you show some sample code on how y

RE: UUID on TableAPI

2022-04-25 Thread lan tran
Ok, thanks for the clarification.   Sent from Mail for Windows From: Francis ConroySent: Tuesday, April 26, 2022 7:26 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: UUID on TableAPI Hi Quynh,  My understanding is mostly based on the documentation I linked in the first reply. If the flink version and the query both remain the same then you can restart a job from a savepoint, this means that it might be workable for running a low-criticality job on say an AWS spot instance. That's about all I know. On Tue, 26 Apr 2022 at 10:17, lan tran <indigoblue7...@gmail.com> wrote:Hi Francis,Thanks for the reply. However, can you elaborate more on the part ‘work for cases where you wish to pause/resume a job’ ? Is there another way besides savepoint having this mechanism that can implement in Table API ? Best,Quynh  Sent from Mail for Windows From: Francis ConroySent: Tuesday, April 26, 2022 7:07 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: UUID on TableAPI Hi  Quynh, Set the savepoint UUID? Is that what you mean? The savepoint UUIDs are issued dynamically when you request them, flink won't know automatically what the last savepoint was, but you can start a new job and restore from a savepoint by passing in the UUID. All that said there are limitations around using savepoints and Flink SQL because of the way the planner works https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution. However it might work for cases where you wish to pause/resume a job. On Fri, 22 Apr 2022 at 13:54, lan tran <indigoblue7...@gmail.com> wrote:Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while using Table API (SQL API) ? If not, does it has any mechanism to know that when we start the Flink again, it will know that it was that UUID ?Best,Quynh. Sent from Mail for Windows  This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia  This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia 


RE: UUID on TableAPI

2022-04-25 Thread lan tran
Hi Francis,Thanks for the reply. However, can you elaborate more on the part ‘work for cases where you wish to pause/resume a job’ ? Is there another way besides savepoint having this mechanism that can implement in Table API ? Best,Quynh  Sent from Mail for Windows From: Francis ConroySent: Tuesday, April 26, 2022 7:07 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: UUID on TableAPI Hi  Quynh, Set the savepoint UUID? Is that what you mean? The savepoint UUIDs are issued dynamically when you request them, flink won't know automatically what the last savepoint was, but you can start a new job and restore from a savepoint by passing in the UUID. All that said there are limitations around using savepoints and Flink SQL because of the way the planner works https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution. However it might work for cases where you wish to pause/resume a job. On Fri, 22 Apr 2022 at 13:54, lan tran <indigoblue7...@gmail.com> wrote:Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while using Table API (SQL API) ? If not, does it has any mechanism to know that when we start the Flink again, it will know that it was that UUID ?Best,Quynh. Sent from Mail for Windows  This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia 


RE: AvroRowDeserializationSchema

2022-04-25 Thread lan tran
Hi Dian, Thank again for fast response. As your suggestion above, we can apply to set the UID for only for the DataStream state (as you suggest to convert from table to data stream). However, at the first phase which is collecting the data from Kafka ( having Debezium format), the UID cannot be set since we are using Table API (auto generate the UID). Therefore, if there is some crashed or needed revert using SavePoint, we cannot use it in the first phase since we cannot set the UID for this => so how can we revert it ?. As a result of that, we want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use the Savepoint for the whole full flow.Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 7:46 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,You could try the following code (also it may be a little hacky):```def set_uid_for_source(ds: DataStream, uid: str):transformation = ds._j_data_stream.getTransformation() source_transformation = transformationwhile not source_transformation.getInputs().isEmpty():source_transformation = source_transformation.getInputs().get(0) source_transformation.setUid(uid)```Besides, could you describe your use case a bit and also how you want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for the sources with these formats, it will send UPDATE messages to downstream operators. RegardsDian On Mon, Apr 25, 2022 at 12:31 PM lan tran <indigoblue7...@gmail.com> wrote:Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API), the uid is generated automatically which means we cannot revert if the system is crashed. Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 11:04 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema are still not supported in Python DataStream API. Just take a further look at the Java implementation of DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the results type is RowData instead of Row and so it should be not that easy to be directly supported in Python DataStream API. However, it supports conversion between Table API & DataStream API[1]. Could you firstly create a Table which consumes data from kafka and then convert it to a DataStream API?Regards,Dian[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <dian0511...@gmail.com> wrote:Yes, we should support them. For now, if you want to use them, you could create ones in your own project. You could refer to AvroRowDeserializationSchema[1] as an example. It should not be complicated as it's simply a wrapper of the Java implementation.Regards,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com> wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.Best,QuynhSent from Mail for Windows From: Dian FuSent: Friday, April 22, 2022 9:36 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,I have added an example on how to use AvroRowDeserializationSchema in Python DataStream API in [1]. Please take a look at if that helps for you~Regards,Dian[1] https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <dian0511...@gmail.com> wrote:Hi Quynh,Could you show some sample code on how you use it?Regards,Dian On Fri, Apr 22, 2022 at 1:42 PM lan tran <indigoblue7...@gmail.com> wrote:Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not existTherefore, please help check. ThanksBest,Quynh  Sent from Mail for Windows From: lan tranSent: Thursday, April 21, 2022 1:43 PMTo: user@flink.apache.orgSubject: AvroRowDeserializationSchema Hi team, I want to implement AvroRowDeserializationSchema 

RE: AvroRowDeserializationSchema

2022-04-24 Thread lan tran
Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API), the uid is generated automatically which means we cannot revert if the system is crashed. Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 11:04 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema are still not supported in Python DataStream API. Just take a further look at the Java implementation of DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the results type is RowData instead of Row and so it should be not that easy to be directly supported in Python DataStream API. However, it supports conversion between Table API & DataStream API[1]. Could you firstly create a Table which consumes data from kafka and then convert it to a DataStream API?Regards,Dian[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <dian0511...@gmail.com> wrote:Yes, we should support them. For now, if you want to use them, you could create ones in your own project. You could refer to AvroRowDeserializationSchema[1] as an example. It should not be complicated as it's simply a wrapper of the Java implementation.Regards,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com> wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.Best,QuynhSent from Mail for Windows From: Dian FuSent: Friday, April 22, 2022 9:36 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,I have added an example on how to use AvroRowDeserializationSchema in Python DataStream API in [1]. Please take a look at if that helps for you~Regards,Dian[1] https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <dian0511...@gmail.com> wrote:Hi Quynh,Could you show some sample code on how you use it?Regards,Dian On Fri, Apr 22, 2022 at 1:42 PM lan tran <indigoblue7...@gmail.com> wrote:Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not existTherefore, please help check. ThanksBest,Quynh  Sent from Mail for Windows From: lan tranSent: Thursday, April 21, 2022 1:43 PMTo: user@flink.apache.orgSubject: AvroRowDeserializationSchema Hi team, I want to implement AvroRowDeserializationSchema when consume data from Kafka, however from the documentation, I did not understand what are avro_schema_string and record_class ? I would be great if you can give me the example on this (I only have the example on Java, however, I was doing it using PyFlink ).As my understanding avro_schema_string is schema_registry_url ? Does it support this  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like in TableAPI ?Best,Quynh.Sent from Mail for Windows


RE: AvroRowDeserializationSchema

2022-04-24 Thread lan tran
Thank Dian !! Very appreciate this.However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.Best,QuynhSent from Mail for Windows From: Dian FuSent: Friday, April 22, 2022 9:36 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,I have added an example on how to use AvroRowDeserializationSchema in Python DataStream API in [1]. Please take a look at if that helps for you~Regards,Dian[1] https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <dian0511...@gmail.com> wrote:Hi Quynh,Could you show some sample code on how you use it?Regards,Dian On Fri, Apr 22, 2022 at 1:42 PM lan tran <indigoblue7...@gmail.com> wrote:Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not existTherefore, please help check. ThanksBest,Quynh  Sent from Mail for Windows From: lan tranSent: Thursday, April 21, 2022 1:43 PMTo: user@flink.apache.orgSubject: AvroRowDeserializationSchema Hi team, I want to implement AvroRowDeserializationSchema when consume data from Kafka, however from the documentation, I did not understand what are avro_schema_string and record_class ? I would be great if you can give me the example on this (I only have the example on Java, however, I was doing it using PyFlink ).As my understanding avro_schema_string is schema_registry_url ? Does it support this  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like in TableAPI ?Best,Quynh.Sent from Mail for Windows   


RE: DebeziumAvroDeserializationSchema

2022-04-21 Thread lan tran
Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not existTherefore, please help check. ThanksBest,Quynh  On 2022/04/22 03:14:43 lan tran wrote:> Hi team,  >   > Currently, I did not see this functions in PyFlink, therefore any suggestion> on using this on PyFlink ?  >   > Best,  > Quynh.> > > > Sent from [Mail](https://go.microsoft.com/fwlink/?LinkId=550986) for Windows> > > > Sent from Mail for Windows 


RE: AvroRowDeserializationSchema

2022-04-21 Thread lan tran
Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not existTherefore, please help check. ThanksBest,Quynh  Sent from Mail for Windows From: lan tranSent: Thursday, April 21, 2022 1:43 PMTo: user@flink.apache.orgSubject: AvroRowDeserializationSchema Hi team, I want to implement AvroRowDeserializationSchema when consume data from Kafka, however from the documentation, I did not understand what are avro_schema_string and record_class ? I would be great if you can give me the example on this (I only have the example on Java, however, I was doing it using PyFlink ).As my understanding avro_schema_string is schema_registry_url ? Does it support this  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like in TableAPI ?Best,Quynh.Sent from Mail for Windows  


UUID on TableAPI

2022-04-21 Thread lan tran
Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while using Table API (SQL API) ? If not, does it has any mechanism to know that when we start the Flink again, it will know that it was that UUID ?Best,Quynh. Sent from Mail for Windows 


DebeziumAvroDeserializationSchema

2022-04-21 Thread lan tran
Hi team,Currently, I did not see this functions in PyFlink, therefore any suggestion on using this on PyFlink ?Best,Quynh. Sent from Mail for Windows 


AvroRowDeserializationSchema

2022-04-21 Thread lan tran
Hi team, I want to implement AvroRowDeserializationSchema when consume data from Kafka, however from the documentation, I did not understand what are avro_schema_string and record_class ? I would be great if you can give me the example on this (I only have the example on Java, however, I was doing it using PyFlink ).As my understanding avro_schema_string is schema_registry_url ? Does it support this  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like in TableAPI ?Best,Quynh.Sent from Mail for Windows 


Naming sql_statment job

2022-03-30 Thread lan tran
Hi team, When I was using Table API to submit the SQL job using execute_query(), the name is created by Flink. However, I wonder there is a way to config that name. I see that in the SQL-Client they have this statementSET 'pipeline.name' = '{job_name}'. Wonder that if it can execute this using execute_query (throw exception), sql_query (throw exception) or something else ? Thank team.Best,Quynh Sent from Mail for Windows 


Datetime format

2022-03-27 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?Best,Quynh   Sent from Mail for Windows 


Date time convert

2022-03-27 Thread lan tran
Hi team,Currently, I was facing this situation, I have the format string datetime like this "2018-03-29T07:39:49.722594Z". So how can I convert this into timestamp with local time zone ?My current solution as cast as belowTO_TIMESTAMP(REPLACE(parcel.picked_up_date, 'T', ' '),'-MM-dd HH:mm:ss%z') AS p_picked_up_date,However, Flink seem does not support the ‘%z’. Therefore, no data was casted through this ?Best,QuynhSent from Mail for Windows 


FileSystem format

2022-03-22 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?Best,Quynh  Sent from Mail for Windows 


Asking about the partition files

2022-03-22 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?Best,Quynh Sent from Mail for Windows 


[File partition Flink]

2022-03-17 Thread lan tran
Hi team, I have some questions about the format when I process the filesIn-progress / Pending: part--.inprogress.uidFinished: part--Can you explain more about the partFileIndex since the format of the files is quite weird. It produces two files (I wonder it related to the parallelism which we have set is 2).  part-6a13c70e-638d-4b10-820c-d7577e949e89-0-191  part-6a13c70e-638d-4b10-820c-d7577e949e89-1-190If so, what happens if that our data is huge but the the commit time (checkpoint around 1000ms) is small. Does it write into another files just like this part-6a13c70e-638d-4b10-820c-d7577e949e89-0-192  by increasing the final number ? Or they have the diffirent format. Another question is that since we are using the table API, does any option that we can have to limit the files size or time that the files should closed  as it stated in the doc since I see that there is the option called ‘sink.rolling-policy.file-size’ and ‘sink.partition-commit.delay’. Does this relevant to what we want ?One more question, although we have set 3 parallelism but at the end is still be 1. Can you explain a bit about this case for me ?Thanks team.Best,Quynh.  Sent from Mail for Windows