[jira] [Created] (FLINK-32725) Add option to control writing of timestamp to Kafka topic in KafkaRecordSerializationSchema.builder

2023-08-01 Thread xiechenling (Jira)
xiechenling created FLINK-32725:
---

 Summary: Add option to control writing of timestamp to Kafka topic 
in KafkaRecordSerializationSchema.builder
 Key: FLINK-32725
 URL: https://issues.apache.org/jira/browse/FLINK-32725
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.0
 Environment: flink 1.16.2
Reporter: xiechenling


In the older versions of Kafka sink for Flink, it was possible to configure 
whether the message timestamp should be written to Kafka. This was achievable 
using the method `FlinkKafkaProducer.setWriteTimestampToKafka(true)`.

However, in the newer versions of Kafka sink, when using 
`KafkaRecordSerializationSchema.builder()`, the message timestamp is 
automatically written to the Kafka topic using the context's timestamp.

{code:scala}
KafkaSink
...
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
...
.build()
{code}


If a user wishes to exclude the timestamp from being written to Kafka, they 
currently need to create a custom `KafkaRecordSerializationSchema` by extending 
it and overriding the `serialize` method.

{code:scala}
KafkaSink.builder[(String, String)]()
.setBootstrapServers(kafkaAddress)
.setRecordSerializer((element: (String, String), context: 
KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long) => {
new ProducerRecord(sinkTopic, element._1.getBytes, element._2.getBytes)
})
{code}

I propose adding a new method, similar to `setWriteTimestampToKafka`, to 
`KafkaRecordSerializationSchema.builder()`, which allows users to control 
whether the timestamp should be included in the output to the Kafka topic. This 
would provide a more straightforward and consistent approach for users who do 
not want the timestamp to be written to Kafka.

Thank you for considering this enhancement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32266) Kafka Source Continues Consuming Previous Topic After Loading Savepoint

2023-06-05 Thread xiechenling (Jira)
xiechenling created FLINK-32266:
---

 Summary: Kafka Source Continues Consuming Previous Topic After 
Loading Savepoint
 Key: FLINK-32266
 URL: https://issues.apache.org/jira/browse/FLINK-32266
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.3
 Environment: Flink version: 1.15.3
Kafka Connector version: 1.15.3 FLIP-27
Reporter: xiechenling


I encountered an issue with the Flink Kafka Connector's Kafka Source where it 
continues consuming data from a previously consumed topic even after loading a 
savepoint and configuring it to consume data from a different topic.

 

Steps to reproduce:
 # Set up the Kafka Source to consume data from Topic A.
 # Start the Flink job.
 # Stop the job and create a savepoint.
 # Modify the configuration to consume data from Topic B.
 # Load the job from the savepoint and start it.
 # Observe that the job consumes data from both Topic A and Topic B, instead of 
just Topic B.

 

Expected behavior:

After loading a savepoint and configuring the Kafka Source to consume data from 
a new topic, the job should only consume data from the newly configured topic.

 

Actual behavior:

The Kafka Source continues consuming data from the previous topic (Topic A), in 
addition to the newly configured topic (Topic B).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29380) Two streams union, watermark error, not the minimum value

2022-09-21 Thread xiechenling (Jira)
xiechenling created FLINK-29380:
---

 Summary: Two streams union, watermark error, not the minimum value
 Key: FLINK-29380
 URL: https://issues.apache.org/jira/browse/FLINK-29380
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.2
Reporter: xiechenling
 Attachments: image-2022-09-21-17-59-01-846.png

Two streams union, watermark error, not the minimum value, connect operator  
watermark is true.
!image-2022-09-21-17-59-01-846.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-25440) Apache Pulsar Connector Document description error about 'Starting Position'.

2021-12-23 Thread xiechenling (Jira)
xiechenling created FLINK-25440:
---

 Summary: Apache Pulsar Connector Document description error about 
'Starting Position'.
 Key: FLINK-25440
 URL: https://issues.apache.org/jira/browse/FLINK-25440
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.14.2
Reporter: xiechenling


Starting Position description error.

Start from the specified message time by Message.getEventTime().

StartCursor.fromMessageTime(long)

it should be 'Start from the specified message time by publishTime.'



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-23420) sql stream mode lag function java.lang.NullPointerException

2021-07-18 Thread xiechenling (Jira)
xiechenling created FLINK-23420:
---

 Summary: sql stream mode lag function 
java.lang.NullPointerException
 Key: FLINK-23420
 URL: https://issues.apache.org/jira/browse/FLINK-23420
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.1
Reporter: xiechenling


flink 1.13.1  BlinkPlanner  StreamingMode  EXACTLY_ONCE

log

 
{code:java}
2021-07-15 21:07:46,328 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 1 (type=CHECKPOINT) @ 1626354466304 for job 
fd3c2294afe74778cb6ce3bd5d42f0c0.
2021-07-15 21:07:46,774 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
OverAggregate(partitionBy=[targetId], orderBy=[lastDt ASC], window=[ RANG 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[displayId, mmsi, 
latitude, longitude, course, heading, speed, len, minLen, maxLen, wid, id, 
province, nationality, lastTm, status, vesselName, sClass, targetId, lastDt, 
$20, LAG(displayId) AS w0$o0, LAG(mmsi) AS w0$o1, LAG($20) AS w0$o2, 
LAG(latitude) AS w0$o3, LAG(longitude) AS w0$o4, LAG(course) AS w0$o5, 
LAG(heading) AS w0$o6, LAG(speed) AS w0$o7, LAG(len) AS w0$o8, LAG(minLen) AS 
w0$o9, LAG(maxLen) AS w0$o10, LAG(wid) AS w0$o11, LAG(id) AS w0$o12, 
LAG(province) AS w0$o13, LAG(nationality) AS w0$o14, LAG(lastTm) AS w0$o15, 
LAG(status) AS w0$o16, LAG(vesselName) AS w0$o17, LAG(sClass) AS w0$o18, 
LAG(targetId) AS w0$o19, LAG(lastDt) AS w0$o20]) -> Calc(select=[displayId, 
mmsi, $20 AS state, latitude, longitude, course, heading, speed, len, minLen, 
maxLen, wid, id, province, nationality, lastTm, status, vesselName, sClass, 
targetId, lastDt, w0$o0 AS previous_displayId, w0$o1 AS previous_mmsi, w0$o2 AS 
previous_state, w0$o3 AS previous_latitude, w0$o4 AS previous_longitude, w0$o5 
AS previous_course, w0$o6 AS previous_heading, w0$o7 AS previous_speed, w0$o8 
AS previous_len, w0$o9 AS previous_minLen, w0$o10 AS previous_maxLen, w0$o11 AS 
previous_wid, w0$o12 AS previous_id, w0$o13 AS previous_province, w0$o14 AS 
previous_nationality, w0$o15 AS previous_lastTm, w0$o16 AS previous_status, 
w0$o17 AS previous_vesselName, w0$o18 AS previous_sClass, w0$o19 AS 
previous_targetId, CAST(w0$o20) AS previous_lastDt], where=[(w0$o1 <> mmsi)]) 
-> TableToDataSteam(type=ROW<`displayId` INT, `mmsi` INT, `state` TINYINT, 
`latitude` DOUBLE, `longitude` DOUBLE, `course` FLOAT, `heading` FLOAT, `speed` 
FLOAT, `len` INT, `minLen` INT, `maxLen` INT, `wid` INT, `id` STRING, 
`province` STRING, `nationality` STRING, `lastTm` BIGINT, `status` STRING, 
`vesselName` STRING, `sClass` STRING, `targetId` STRING, `lastDt` TIMESTAMP(3), 
`previous_displayId` INT, `previous_mmsi` INT, `previous_state` TINYINT, 
`previous_latitude` DOUBLE, `previous_longitude` DOUBLE, `previous_course` 
FLOAT, `previous_heading` FLOAT, `previous_speed` FLOAT, `previous_len` INT, 
`previous_minLen` INT, `previous_maxLen` INT, `previous_wid` INT, `previous_id` 
STRING, `previous_province` STRING, `previous_nationality` STRING, 
`previous_lastTm` BIGINT, `previous_status` STRING, `previous_vesselName` 
STRING, `previous_sClass` STRING, `previous_targetId` STRING, `previous_lastDt` 
TIMESTAMP(3)> NOT NULL, rowtime=false) (3/3) (34f17a50932ba7852cff00dabecae88e) 
switched from RUNNING to FAILED on container_1625646226467_0291_01_05 @ 
hadoop-15 (dataPort=38082).
java.lang.NullPointerException: null
at 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:67)
 ~[hlx_bigdata_flink.jar:?]
at 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:30)
 ~[hlx_bigdata_flink.jar:?]
at 
org.apache.flink.table.runtime.typeutils.LinkedListSerializer.serialize(LinkedListSerializer.java:114)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.runtime.typeutils.LinkedListSerializer.serialize(LinkedListSerializer.java:39)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:558)
 ~[hlx_bigdata_flink.jar:?]
at 
org.apache.flink.table.data.binary.BinaryRawValueData.materialize(BinaryRawValueData.java:113)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.data.binary.LazyBinaryFormat.ensureMaterialized(LazyBinaryFormat.java:126)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.copy(RawValueDataSerializer.java:60)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.copy(RawValueDataSerializer.java:36)
 ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)