Out of memory in heap memory when working with state

2022-09-05 Thread lan tran
Hi team,Currently, I was facing the OutOfMemoryError: Java heap space. This was some how due to the fact that I was storing the state on FileSystem. With the FsStateBackend, the working state for each task manager is in memory (on the JVM heap), and state backups (checkpoints) go to a distributed f

The methodlogy behind the join in Table API and Datastream

2022-06-29 Thread lan tran
Hi team,I have the question about the methodology behind the joining using SQL-Client and DataStream. I have some scenario like this: I have two tables: t1 and t2 and I consume the WAL log from it and send to Kafka. Next, I will join two tables above together and convert this table in changelog str

Join two data stream using DataStream API using PyFlink

2022-05-13 Thread lan tran
Hi team, I have the use case is that I want to join two datastream that have the same id. If we convert into sql we will have something like thisSELECT suppliers.supplier_id, suppliers.supplier_name, orders.order_dateFROM suppliers INNER JOIN ordersON suppliers.supplier_id = orders.supplier_id;Howe

RE: AvroRowDeserializationSchema

2022-04-28 Thread lan tran
Thu, Apr 28, 2022 at 11:00 AM lan tran <indigoblue7...@gmail.com> wrote:Hi Dian,Sorry for missing your mail, so if I did as your suggestion and the Flink somehow crashed and we have to restart the service, does the Flink job know the offset where does it read from Kafka ? Sent from Mail for W

RE: AvroRowDeserializationSchema

2022-04-27 Thread lan tran
source operator using the same code above, then perform operations with DataStream API.Regards,Dian On Mon, Apr 25, 2022 at 9:27 PM lan tran <indigoblue7...@gmail.com> wrote:Hi Dian, Thank again for fast response.As your suggestion above, we can apply to set the UID for only for the DataStream sta

RE: UUID on TableAPI

2022-04-25 Thread lan tran
version and the query both remain the same then you can restart a job from a savepoint, this means that it might be workable for running a low-criticality job on say an AWS spot instance. That's about all I know. On Tue, 26 Apr 2022 at 10:17, lan tran <indigoblue7...@gmail.com> wrote:Hi Fra

RE: UUID on TableAPI

2022-04-25 Thread lan tran
ution. However it might work for cases where you wish to pause/resume a job. On Fri, 22 Apr 2022 at 13:54, lan tran <indigoblue7...@gmail.com> wrote:Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while usin

RE: AvroRowDeserializationSchema

2022-04-25 Thread lan tran
end UPDATE messages to downstream operators. RegardsDian On Mon, Apr 25, 2022 at 12:31 PM lan tran <indigoblue7...@gmail.com> wrote:Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API)

RE: AvroRowDeserializationSchema

2022-04-24 Thread lan tran
s,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com> wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In c

RE: AvroRowDeserializationSchema

2022-04-24 Thread lan tran
egards,Dian On Fri, Apr 22, 2022 at 1:42 PM lan tran <indigoblue7...@gmail.com> wrote:Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred w

RE: DebeziumAvroDeserializationSchema

2022-04-21 Thread lan tran
: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not existTherefore, please help check. ThanksBest,Quynh  On 2022/04/22 03:14:43 lan tran wrote:> Hi team,  >   > Currently, I did not see this functions in PyFlink, ther

RE: AvroRowDeserializationSchema

2022-04-21 Thread lan tran
Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Construc

UUID on TableAPI

2022-04-21 Thread lan tran
Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while using Table API (SQL API) ? If not, does it has any mechanism to know that when we start the Flink again, it will know that it was that UUID ?Best,Quynh

DebeziumAvroDeserializationSchema

2022-04-21 Thread lan tran
Hi team,Currently, I did not see this functions in PyFlink, therefore any suggestion on using this on PyFlink ?Best,Quynh. Sent from Mail for Windows 

AvroRowDeserializationSchema

2022-04-20 Thread lan tran
Hi team, I want to implement AvroRowDeserializationSchema when consume data from Kafka, however from the documentation, I did not understand what are avro_schema_string and record_class ? I would be great if you can give me the example on this (I only have the example on Java, however, I was doing

Naming sql_statment job

2022-03-30 Thread lan tran
Hi team, When I was using Table API to submit the SQL job using execute_query(), the name is created by Flink. However, I wonder there is a way to config that name. I see that in the SQL-Client they have this statementSET 'pipeline.name' = '{job_name}'. Wonder that if it can execute this using exec

Datetime format

2022-03-27 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?B

Date time convert

2022-03-27 Thread lan tran
Hi team,Currently, I was facing this situation, I have the format string datetime like this "2018-03-29T07:39:49.722594Z". So how can I convert this into timestamp with local time zone ?My current solution as cast as belowTO_TIMESTAMP(REPLACE(parcel.picked_up_date, 'T', ' '),'-MM-dd HH:mm:ss%z'

FileSystem format

2022-03-22 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?B

Asking about the partition files

2022-03-22 Thread lan tran
Hi team, So basically, when I use Flink Table API to generate the files and store in S3. The format files will be like this part-0d373ee1-d594-40b1-a3cf-8fa895260980-0-0. So my question is is there any way that we can config this files names (by adding the last_modified_value) to this files name ?B

[File partition Flink]

2022-03-17 Thread lan tran
Hi team, I have some questions about the format when I process the filesIn-progress / Pending: part--.inprogress.uidFinished: part--Can you explain more about the partFileIndex since the format of the files is quite weird. It produces two files (I wonder it related to the parallelism which we have