[jira] [Created] (FLINK-30438) The generated schema is not correct when using value.format debezium-avro-confluent

2022-12-16 Thread Jun Qin (Jira)
Jun Qin created FLINK-30438:
---

 Summary: The generated schema is not correct when using 
value.format debezium-avro-confluent
 Key: FLINK-30438
 URL: https://issues.apache.org/jira/browse/FLINK-30438
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.0
Reporter: Jun Qin


With the following code:
{code:java}
CREATE TABLE TEST(
    ID BIGINT,
    INTEGRATION_ID STRING,
    PRIMARY KEY(INTEGRATION_ID) NOT ENFORCED
) WITH(
    'connector' = 'kafka',
    'topic' = 'TEST',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'TEST',
    'key.format' = 'avro-confluent',
    'key.fields' = 'INTEGRATION_ID',
    'key.avro-confluent.url' = 'http://schema-registry:8081',
    'value.format' = 'debezium-avro-confluent',
    'value.debezium-avro-confluent.url' = 'http://schema-registry:8081',
    'scan.startup.mode' = 'earliest-offset'
); {code}
and this INSERT statement:
{code:java}
INSERT INTO TEST SELECT 1, '1'; {code}
The schema we get in schema registry is:
{code:java}
[
  "null",
  {
    "fields": [
      {
        "default": null,
        "name": "before",
        "type": [
          "null",
          {
            "fields": [
              {
                "default": null,
                "name": "ID",
                "type": [
                  "null",
                  "long"
                ]
              },
              {
                "name": "INTEGRATION_ID",
                "type": "string"
              }
            ],
            "name": "record_before",
            "type": "record"
          }
        ]
      },
      {
        "default": null,
        "name": "after",
        "type": [
          "null",
          {
            "fields": [
              {
                "default": null,
                "name": "ID",
                "type": [
                  "null",
                  "long"
                ]
              },
              {
                "name": "INTEGRATION_ID",
                "type": "string"
              }
            ],
            "name": "record_after",
            "type": "record"
          }
        ]
      },
      {
        "default": null,
        "name": "op",
        "type": [
          "null",
          "string"
        ]
      }
    ],
    "name": "record",
    "namespace": "org.apache.flink.avro.generated",
    "type": "record"
  }
] {code}
The first 'null' in the schema does not look to be correct. Can you check and 
fix?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30220) Hide user credentials in Flink SQL JDBC connector

2022-11-26 Thread Jun Qin (Jira)
Jun Qin created FLINK-30220:
---

 Summary: Hide user credentials in Flink SQL JDBC connector
 Key: FLINK-30220
 URL: https://issues.apache.org/jira/browse/FLINK-30220
 Project: Flink
  Issue Type: Improvement
Reporter: Jun Qin


Similar to FLINK-28028, when using Flink SQL JDBC connector, we should also 
have a way to secure the username and the password used in the DDL:
{code:java}
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users',
   'username' = 'a-username',
   'password' = 'a-password'
);
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28834) Add TemporalJoin example and ITCase

2022-08-05 Thread Jun Qin (Jira)
Jun Qin created FLINK-28834:
---

 Summary: Add TemporalJoin example and ITCase
 Key: FLINK-28834
 URL: https://issues.apache.org/jira/browse/FLINK-28834
 Project: Flink
  Issue Type: Improvement
Reporter: Jun Qin
Assignee: Jun Qin


A temporal join example is useful to show users how to use temporary join and 
how it works. The corresponding ITCase also helps to verify the temporal join 
functionality in Flink SQL and to show users how to do Flink SQL testing.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28650) Flink SQL Parsing bug for METADATA

2022-07-22 Thread Jun Qin (Jira)
Jun Qin created FLINK-28650:
---

 Summary: Flink SQL Parsing bug for METADATA
 Key: FLINK-28650
 URL: https://issues.apache.org/jira/browse/FLINK-28650
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.4
Reporter: Jun Qin


With the following source/sink tables:
{code:sql}
CREATE TABLE sourceTable ( 
`key` INT, 
`time` TIMESTAMP(3),
`value` STRING NOT NULL, 
id INT 
) 
WITH ( 
'connector' = 'datagen', 
'rows-per-second'='10', 
'fields.id.kind'='sequence', 
'fields.id.start'='1', 
'fields.id.end'='100' 
);

CREATE TABLE sinkTable1 ( 
`time` TIMESTAMP(3) METADATA FROM 'timestamp', 
`value` STRING NOT NULL
) 
WITH ( 
  'connector' = 'kafka',
...
)
CREATE TABLE sinkTable2 ( 
`time` TIMESTAMP(3),-- without METADATA
`value` STRING NOT NULL
) 
WITH ( 
  'connector' = 'kafka',
...
)
{code}
the following three pass the validation:
{code:sql}
INSERT INTO sinkTable1
SELECT 
`time`, 
`value`
FROM sourceTable;

INSERT INTO sinkTable2
SELECT 
`time`, 
`value`
FROM sourceTable;

INSERT INTO sinkTable2 (`time`,`value`)
SELECT 
`time`, 
`value`
FROM sourceTable;
{code}
but this one does not:
{code:sql}
INSERT INTO sinkTable1 (`time`,`value`)
SELECT 
`time`, 
`value`
FROM sourceTable;
{code}
It failed with 

{code:java}
Unknown target column 'time'
{code}

It seems when providing column names in INSERT, the METADATA have an undesired 
effect. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-27473) Capture time that a job spends on initializing tasks

2022-05-02 Thread Jun Qin (Jira)
Jun Qin created FLINK-27473:
---

 Summary: Capture time that a job spends on initializing tasks
 Key: FLINK-27473
 URL: https://issues.apache.org/jira/browse/FLINK-27473
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Metrics
Reporter: Jun Qin


Similar to https://issues.apache.org/jira/browse/FLINK-25888, we should have it 
also for initializing tasks.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27173) CoGroupedStreams$TaggedUnion cannot be used as a POJO type

2022-04-11 Thread Jun Qin (Jira)
Jun Qin created FLINK-27173:
---

 Summary: CoGroupedStreams$TaggedUnion cannot be used as a POJO type
 Key: FLINK-27173
 URL: https://issues.apache.org/jira/browse/FLINK-27173
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.14.4
Reporter: Jun Qin
 Attachments: StreamingJob.java

Attached is the code to demo the issue.  When running the job, we can see the 
following:
{code:java}
11:48:26,584 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
[] - class 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$TaggedUnion does not 
contain a setter for field one
11:48:26,586 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
[] - Class class 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$TaggedUnion cannot 
be used as a POJO type because not all fields are valid POJO fields, and must 
be processed as GenericType. Please read the Flink documentation on "Data Types 
& Serialization" for details of the effect on performance. {code}
TaggedUnion is a class in Flink. This should be fixed in Flink.

 

[^StreamingJob.java]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26018) Late events in the new KafkaSource

2022-02-08 Thread Jun Qin (Jira)
Jun Qin created FLINK-26018:
---

 Summary: Late events in the new KafkaSource
 Key: FLINK-26018
 URL: https://issues.apache.org/jira/browse/FLINK-26018
 Project: Flink
  Issue Type: Bug
Reporter: Jun Qin
 Attachments: message in kafka.txt, 
taskmanager_10.28.0.131_33249-b3370c_log

There is an issue with the new KafkaSource connector in Flink 1.14: when one 
task consumes messages from multiple topic partitions (statically created, 
timestamp are in order), it may start with one partition and advances 
watermarks before the data from other partitions come. In this case, the early 
messages in other partitions may unnecessarily be considered  as late ones.

I discussed with [~renqs], it seems that the new KafkaSource only adds a 
partition into {{WatermarkMultiplexer}} when it receives data from that 
partition. In contrast, FlinkKafkaConsumer adds all known partition before it 
fetch any data. 

Attached two files: the messages in Kafka and the corresponding TM logs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25160) Make doc clear: tolerable-failed-checkpoints counts consecutive failures

2021-12-03 Thread Jun Qin (Jira)
Jun Qin created FLINK-25160:
---

 Summary: Make doc clear: tolerable-failed-checkpoints counts 
consecutive failures
 Key: FLINK-25160
 URL: https://issues.apache.org/jira/browse/FLINK-25160
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.13.3, 1.12.5, 1.14.0
Reporter: Jun Qin


According to the code, tolerable-failed-checkpoints counts the consecutive 
failures. We should make this clear in the doc 
[config|https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/]

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24686) Make doc clear on AsyncFunction::timeout() overriding

2021-10-28 Thread Jun Qin (Jira)
Jun Qin created FLINK-24686:
---

 Summary: Make doc clear on AsyncFunction::timeout() overriding
 Key: FLINK-24686
 URL: https://issues.apache.org/jira/browse/FLINK-24686
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Jun Qin
Assignee: Jun Qin


Sometimes, a user overrides {{AsyncFunction::timeout()}} with an empty method 
or with only logging code. This causes the timeout does not signal back to the 
framework and job stuck especially when using {{orderedWait()}}. Opening this 
Jira to make the doc clear on this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24543) Zookeeper connection issue causes inconsistent state in Flink

2021-10-14 Thread Jun Qin (Jira)
Jun Qin created FLINK-24543:
---

 Summary: Zookeeper connection issue causes inconsistent state in 
Flink
 Key: FLINK-24543
 URL: https://issues.apache.org/jira/browse/FLINK-24543
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.2
Reporter: Jun Qin


Env: Flink 1.13.2 with Zookeeper HA

Here is what happened:
{code:bash}
# checkpoint 1116 was triggered
2021-10-09 00:16:49,555 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 1116 (type=CHECKPOINT) @ 1633738609538 for job 
a8a4fb85b681a897ba118db64333c9e5.

# a few seconds later, zookeeper connection suspended, it turned out to be a 
disk issue at zookeeper side caused slow fsync and commit)
2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Connection to ZooKeeper suspended. The contender LeaderContender: 
DefaultDispatcherRunner no longer participates in the leader election.

# job was switching to suspended
2021-10-09 00:16:58,564 [flink-akka.actor.default-dispatcher-61] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Disconnect job manager 
b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3
 for job a8a4fb85b681a897ba118db64333c9e5 from the resource manager.

2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-92] INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Registering job manager 
b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3
 for job a8a4fb85b681a897ba118db64333c9e5.


2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the 
JobMaster for job Flink ...(a8a4fb85b681a897ba118db64333c9e5).

2021-10-09 00:16:58,567 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Flink ... 
(a8a4fb85b681a897ba118db64333c9e5) switched from state RUNNING to SUSPENDED.


2021-10-09 00:16:58,570 [flink-akka.actor.default-dispatcher-86] INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
Closing 
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'}.


2021-10-09 00:16:58,667 [flink-akka.actor.default-dispatcher-92] INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
a8a4fb85b681a897ba118db64333c9e5 reached terminal state SUSPENDED.

# zookeeper connector restored
2021-10-09 00:17:08,225 [Curator-ConnectionStateManager-0] INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Connection to ZooKeeper was reconnected. Leader election can be restarted.

# received checkpoint acknowledgement, trying to finalize it, then failed to 
add to zookeeper due to KeeperException$NodeExistsException
2021-10-09 00:17:14,382 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: ... 
(1/5) (09d25852e3e206d6b7fe0d6bc965870f) switched from RUNNING to CANCELING.
2021-10-09 00:17:14,382 [jobmanager-future-thread-1] WARN  
org.apache.flink.runtime.jobmaster.JobMaster [] - Error while 
processing AcknowledgeCheckpoint message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete the 
pending checkpoint 1116. Failure reason: Failure to finalize checkpoint.
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227)
 
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1072)
 
at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
 
at 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
 
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at 

[jira] [Created] (FLINK-24310) A bug in the BufferingSink example in the doc

2021-09-16 Thread Jun Qin (Jira)
Jun Qin created FLINK-24310:
---

 Summary: A bug in the BufferingSink example in the doc
 Key: FLINK-24310
 URL: https://issues.apache.org/jira/browse/FLINK-24310
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Jun Qin


The following line in the BufferingSink on [this 
page|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#operator-state]
 has a bug:
if (bufferedElements.size() == threshold) {
It should be {{>=}} instead of {{==}} , because when restoring from a 
checkpoint during downscaling, the task may get more elements than the 
threshold. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24238) Page title missing

2021-09-09 Thread Jun Qin (Jira)
Jun Qin created FLINK-24238:
---

 Summary: Page title missing
 Key: FLINK-24238
 URL: https://issues.apache.org/jira/browse/FLINK-24238
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.13.2
Reporter: Jun Qin


the page title is missing on this Flink doc: 
[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/versioned_tables/].
  
 
[This 
one|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/]
 is a good example.
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23429) State Processor API failed with FileNotFoundException when working with state files on Cloud Storage

2021-07-19 Thread Jun Qin (Jira)
Jun Qin created FLINK-23429:
---

 Summary: State Processor API failed with FileNotFoundException 
when working with state files on Cloud Storage
 Key: FLINK-23429
 URL: https://issues.apache.org/jira/browse/FLINK-23429
 Project: Flink
  Issue Type: Bug
  Components: API / State Processor
Affects Versions: 1.12.4, 1.13.1
Reporter: Jun Qin


For example, 
{code:java}
Caused by: java.io.FileNotFoundException: 
/savepoints/savepoint-18cf55-d90c1b6b1d12/c965e4fd-9647-4f25-b4cd-5ce0485759fd 
(No such file or directory)
at java.io.FileInputStream.open0(Native Method) ~[?:?]
at java.io.FileInputStream.open(FileInputStream.java:219) ~[?:?]
at java.io.FileInputStream.(FileInputStream.java:157) ~[?:?]
at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) 
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]
at 
org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:61)
 ~[?:?]
at 
org.apache.flink.state.api.output.FileCopyFunction.writeRecord(FileCopyFunction.java:34)
 ~[?:?]
at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:235) 
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
~[flink-dist_2.12-1.12.2.jar:1.12.2]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
{code}

However, the actual files to be copied do exist in the source savepoint.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23411) Expose Flink checkpoint details metrics

2021-07-16 Thread Jun Qin (Jira)
Jun Qin created FLINK-23411:
---

 Summary: Expose Flink checkpoint details metrics
 Key: FLINK-23411
 URL: https://issues.apache.org/jira/browse/FLINK-23411
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Affects Versions: 1.12.4, 1.13.1
Reporter: Jun Qin


The checkpoint metrics as shown in the Flink Web UI like the 
sync/async/alignment/start delay are not exposed to the metrics system. This 
makes problem investigation harder when Web UI is not enabled: those numbers 
can not get in the DEBUG logs. I think we should see how we can expose metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23410) Use a pool of KafkaProducers to commit Kafka Transactions

2021-07-16 Thread Jun Qin (Jira)
Jun Qin created FLINK-23410:
---

 Summary: Use a pool of KafkaProducers to commit Kafka Transactions
 Key: FLINK-23410
 URL: https://issues.apache.org/jira/browse/FLINK-23410
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.12.4, 1.13.1
Reporter: Jun Qin


Currently, {{FlinkKafkaProducer}} contains {{kafkaProducersPoolSize}} (it is 5 
by default). But  {{kafkaProducersPoolSize}} is only used to calculate next 
transactionalIds. There is actually no KafkaProducer pool in  
{{FlinkKafkaProducer}}. This means, for every checkpoint, Flink creates a new 
KakfaProducer (therefore a new thread) and get a new producer id from Kafka 
before it can initialize/commit a transaction. When the checkpoint is complete 
and transaction is committed, the thread is shutdown.  This is inefficient not 
only in terms of Flink's CPU usage (to shutdown/recreate threads) but also in 
terms of the network communication to Kafka (to re-request the producer Id from 
Kafka).  This JIRA is opened to actually implement the KafkaProducer pool.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23201) The check on alignmentDurationNanos seems to be too strict

2021-07-01 Thread Jun Qin (Jira)
Jun Qin created FLINK-23201:
---

 Summary: The check on alignmentDurationNanos seems to be too strict
 Key: FLINK-23201
 URL: https://issues.apache.org/jira/browse/FLINK-23201
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.12.2
Reporter: Jun Qin


The check on alignmentDurationNanos seems to be too strict at the line:

https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java#L74

This may cause a job fail when doing stop-with-savepoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22647) The time unit of state access latency on the Flink UI is incorrect

2021-05-12 Thread Jun Qin (Jira)
Jun Qin created FLINK-22647:
---

 Summary: The time unit of state access latency on the Flink UI is 
incorrect
 Key: FLINK-22647
 URL: https://issues.apache.org/jira/browse/FLINK-22647
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.13.0
Reporter: Jun Qin
 Attachments: Screen Shot 2021-05-12 at 11.34.11 AM.png

See the attached screenshot, the number is actually 7000 nanoseconds, but the 
UI shows 7s.  I suggest removing the unit on the UI, for now, to avoid 
confusion until there is a way to forward the unit to UI in Flink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22642) UI issue with flame graph

2021-05-12 Thread Jun Qin (Jira)
Jun Qin created FLINK-22642:
---

 Summary: UI issue with flame graph
 Key: FLINK-22642
 URL: https://issues.apache.org/jira/browse/FLINK-22642
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.13.0
Reporter: Jun Qin
 Attachments: Screen Shot 2021-05-11 at 10.47.21 PM.png, Screen Shot 
2021-05-11 at 10.50.29 PM.png

There is a minor issue with the flame graph on the Flink 1.13 UI: *often* I 
somehow managed to “pin” those tooltip popups (i.e., the black boxes). They 
stay there even if I switch to another tab. See the attached screenshot. But I 
do not how I did it…I tried to do it by intention, I could not get the same. 
Refresh browser tab can get rid of them.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21336) Activate bloom filter in RocksDB State Backend via Flink configuration

2021-02-09 Thread Jun Qin (Jira)
Jun Qin created FLINK-21336:
---

 Summary: Activate bloom filter in RocksDB State Backend via Flink 
configuration
 Key: FLINK-21336
 URL: https://issues.apache.org/jira/browse/FLINK-21336
 Project: Flink
  Issue Type: Improvement
Reporter: Jun Qin
Assignee: Jun Qin


Activating bloom filter in the RocksDB state backend improves read performance. 
Currently activating bloom filter can only be done by implementing a custom 
ConfigurableRocksDBOptionsFactory. I think we should provide an option to 
activate bloom filter via Flink configuration.

See also the discussion in ML:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Activate-bloom-filter-in-RocksDB-State-Backend-via-Flink-configuration-td48636.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21315) Support to set operator names in state processor API

2021-02-07 Thread Jun Qin (Jira)
Jun Qin created FLINK-21315:
---

 Summary: Support to set operator names in state processor API
 Key: FLINK-21315
 URL: https://issues.apache.org/jira/browse/FLINK-21315
 Project: Flink
  Issue Type: Improvement
Reporter: Jun Qin


Currently, it is not possible to set a user-friendly operator name when using 
state processor API. For example, when you use `readKeyedState()`, the operator 
name shows on the Flink UI is: 

{{DataSource (at readKeyedState(ExistingSavepoint.java:282) 
(org.apache.flink.state.api.input.KeyedStateInputFormat))}}

The same long name is shown on Grafana when Flink metrics are displayed.  This 
Jira aims to provide users an option to set operator names in state processor 
API. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19008) Flink Job runs slow after restore + downscale from an incremental checkpoint (rocksdb)

2020-08-20 Thread Jun Qin (Jira)
Jun Qin created FLINK-19008:
---

 Summary: Flink Job runs slow after restore + downscale from an 
incremental checkpoint (rocksdb)
 Key: FLINK-19008
 URL: https://issues.apache.org/jira/browse/FLINK-19008
 Project: Flink
  Issue Type: Improvement
Reporter: Jun Qin


A customer runs a Flink job with RocksDB state backend. Checkpoints are 
retained and done incrementally. The state size is several TB. When they 
restore + downscale from a retained checkpoint, although the downloading of 
checkpoint files took ~20min, the job throughput returns to the expected level 
only after 3 hours.  

I do not have RocksDB logs. The suspicion for those 3 hours is due to heavy 
RocksDB compaction and/or flush. As it was observed that checkpoint could not 
finish faster enough due to long {{checkpoint duration (sync)}}. How can we 
make this restoring phase shorter? 

For compaction, I think it is worth to check the improvement of:
{code:java}
CompactionPri compaction_pri = kMinOverlappingRatio;{code}
which has been set to default in RocksDB 6.x:
{code:java}
// In Level-based compaction, it Determines which file from a level to be
// picked to merge to the next level. We suggest people try
// kMinOverlappingRatio first when you tune your database.
enum CompactionPri : char {
  // Slightly prioritize larger files by size compensated by #deletes
  kByCompensatedSize = 0x0,
  // First compact files whose data's latest update time is oldest.
  // Try this if you only update some hot keys in small ranges.
  kOldestLargestSeqFirst = 0x1,
  // First compact files whose range hasn't been compacted to the next level
  // for the longest. If your updates are random across the key space,
  // write amplification is slightly better with this option.
  kOldestSmallestSeqFirst = 0x2,
  // First compact files whose ratio between overlapping size in next level
  // and its size is the smallest. It in many cases can optimize write
  // amplification.
  kMinOverlappingRatio = 0x3,
};
...
// Default: kMinOverlappingRatio  CompactionPri compaction_pri = 
kMinOverlappingRatio;{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18998) No watermark is shown on Flink UI when ProcessingTime is used

2020-08-19 Thread Jun Qin (Jira)
Jun Qin created FLINK-18998:
---

 Summary: No watermark is shown on Flink UI when ProcessingTime is 
used
 Key: FLINK-18998
 URL: https://issues.apache.org/jira/browse/FLINK-18998
 Project: Flink
  Issue Type: Bug
Reporter: Jun Qin
 Attachments: screenshot_2020-08-18_at_10.57.39.png

As stated in the subject, no watermark is shown on Flink UI when ProcessingTime 
is used, see the attached screenshot:

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18702) Flink elasticsearch connector leaks threads and classloaders thereof

2020-07-24 Thread Jun Qin (Jira)
Jun Qin created FLINK-18702:
---

 Summary: Flink elasticsearch connector leaks threads and 
classloaders thereof
 Key: FLINK-18702
 URL: https://issues.apache.org/jira/browse/FLINK-18702
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: 1.10.1
Reporter: Jun Qin


Flink elasticsearch connector leaking threads and classloaders thereof.  This 
results in OOM Metaspace when ES sink fails and restarted many times. 

This issue is visible in Flink 1.10 but not in 1.11 because Flink 1.11 does not 
create new class loaders in case of recoveries 
([FLINK-16408|https://issues.apache.org/jira/browse/FLINK-16408])



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17327) Kafka unavailability could cause Flink TM shutdown

2020-04-22 Thread Jun Qin (Jira)
Jun Qin created FLINK-17327:
---

 Summary: Kafka unavailability could cause Flink TM shutdown
 Key: FLINK-17327
 URL: https://issues.apache.org/jira/browse/FLINK-17327
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.10.0
Reporter: Jun Qin


Steps to reproduce:
 # Start a Flink 1.10 standalone cluster
 # Run a Flink job which reads from one Kafka topic and writes to another 
topic, with exactly-once checkpointing enabled
 # Stop all Kafka Brokers after a few successful checkpoints

When Kafka brokers are down:
 # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
could not be established
 # Then, Flink could not complete snapshot due to {{Timeout expired while 
initializing transactional state in 6ms}}
 # After several snapshot failures, Flink reported {{Too many ongoing 
snapshots. Increase kafka producers pool size or decrease number of concurrent 
checkpoints.}}
 # Eventually, Flink tried to cancel the task which did not succeed within 3 min
 # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
down...}}

I will attach the logs to show the details.  Worth to note that if there would 
be no consumer but producer only in the task, the behavior is different:
 # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
could not be established
 # after {{delivery.timeout.ms}} (2min by default), producer reports: 
{{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
output-topic-0:120001 ms has passed since batch creation}}
 # Flink tried to cancel the upstream tasks and created a new producer
 # The new producer obviously reported connectivity issue to brokers
 # This continues till Kafka brokers are back. 
 # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
size or decrease number of concurrent checkpoints.}}
 # Flink cancelled the tasks and restarted them
 # The job continues, and new checkpoint succeeded. 
 # TM runs all the time in this scenario

I set Kafka transaction time out to 1 hour just to avoid transaction timeout. 

To get a producer only task, I called {{env.disableOperatorChaining();}} in the 
second scenario. 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17288) Speedup loading from savepoints into RocksDB by bulk load

2020-04-21 Thread Jun Qin (Jira)
Jun Qin created FLINK-17288:
---

 Summary: Speedup loading from savepoints into RocksDB by bulk load
 Key: FLINK-17288
 URL: https://issues.apache.org/jira/browse/FLINK-17288
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Jun Qin


When resource is a constraint,  loading a big savepoint into RocksDB may take 
some time. This may also impact the job recovery time when the savepoint was 
used for recovery.

Bulk load from savepoint should help in this regard. Here is an excerpt from 
the RocksDB FAQ:
{quote}*Q: What's the fastest way to load data into RocksDB?*

A: A fast way to direct insert data to the DB:
 # using single writer thread and insert in sorted order
 # batch hundreds of keys into one write batch
 # use vector memtable
 # make sure options.max_background_flushes is at least 4
 # before inserting the data, disable automatic compaction, set 
options.level0_file_num_compaction_trigger, 
options.level0_slowdown_writes_trigger and options.level0_stop_writes_trigger 
to very large. After inserting all the data, issue a manual compaction.

3-5 will be automatically done if you call Options::PrepareForBulkLoad() to 
your option

If you can pre-process the data offline before inserting. There is a faster 
way: you can sort the data, generate SST files with non-overlapping ranges in 
parallel and bulkload the SST files. See 
[https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files]
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

2020-03-04 Thread Jun Qin (Jira)
Jun Qin created FLINK-16419:
---

 Summary: Avoid to recommit transactions which are known committed 
successfully to Kafka upon recovery
 Key: FLINK-16419
 URL: https://issues.apache.org/jira/browse/FLINK-16419
 Project: Flink
  Issue Type: Improvement
Reporter: Jun Qin


When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
tries to recommit all pre-committed transactions which are in the snapshot, 
even if those transactions were successfully committed before (i.e., the call 
to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
returns OK). This may lead to recovery failures when recovering from a very old 
snapshot because the transactional IDs in that snapshot have been expired and 
removed from Kafka.  For example the following scenario:
 # Start a Flink job with FlinkKafkaProducer sink with exactly-once
 # Suspend the Flink job with a savepoint A
 # Wait for time longer than {{transactional.id.expiration.ms}} + 
{{transaction.remove.expired.transaction.cleanup.interval.ms}}
 # Recover the job with savepoint A.
 # The recovery will fail with the following error:

{noformat}
2020-02-26 14:33:25,817 INFO  
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer 
 - Attempting to resume transaction Source: Custom Source -> Sink: 
Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata              
              - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer   
           - [Producer clientId=producer-1, transactionalId=Source: Custom 
Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
producer with timeoutMillis = 92233720
36854775807 ms.
2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task         
           - Source: Custom Source -> Sink: Unnamed (1/1) 
(a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
        at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
        at java.lang.Thread.run(Thread.java:748)
{noformat}
After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
way is to let JobManager, after successfully notifies all operators the 
completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
success, e.g., write the successful transactional IDs somewhere in the 
snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15865) When to add .uid() call: inconsistent definition of operators in Flink docs

2020-02-03 Thread Jun Qin (Jira)
Jun Qin created FLINK-15865:
---

 Summary: When to add .uid() call: inconsistent definition of 
operators in Flink docs
 Key: FLINK-15865
 URL: https://issues.apache.org/jira/browse/FLINK-15865
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.9.1
Reporter: Jun Qin


On one hand, the Flink doc suggests to add .uid() call for *all* operators in 
[1], on the other hand, it lists all operators in Flink [2]. The issues are:
 # KeyBy is listed as an operator, but .keyBy().uid() is not a valid call. This 
is same for window(), split(), etc.

 # addSource(), addSink() are not listed as operators, but we do expect user to 
call .uid() after addSource() and addSink(), especially in the exact-once 
scenario.

This creates confusions, esp., for beginners. There should be a better 
definition about which/what kind of operators can have a following uid() call.

[1] [Should I assign ids to all operators in my 
job|https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job]
[2] [Flink 
Operators|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15298) Wrong dependences in the DataStream API tutorial (the wiki-edits example)

2019-12-17 Thread Jun Qin (Jira)
Jun Qin created FLINK-15298:
---

 Summary: Wrong dependences in the DataStream API tutorial (the 
wiki-edits example)
 Key: FLINK-15298
 URL: https://issues.apache.org/jira/browse/FLINK-15298
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.9.1, 1.9.0
Reporter: Jun Qin


[The DataStream API Tutorial in Flink 1.9 | 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/datastream_api.html]
 mentioned the following dependences:

{code:java}


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-streaming-java_2.11
${flink.version}


org.apache.flink
flink-clients_2.11
${flink.version}


org.apache.flink
flink-connector-wikiedits_2.11
${flink.version}


{code}

There are two issues here:
# {{flink-java}} and {{flink-streaming-java}} should be set to *provided* scope
# {{flink-client}} is not needed. If {{flink-client}} is added into *compile* 
scope, {{flink-runtime}} will be added implicitly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14942) State Processing API: add an option to make deep copy

2019-11-25 Thread Jun Qin (Jira)
Jun Qin created FLINK-14942:
---

 Summary: State Processing API: add an option to make deep copy
 Key: FLINK-14942
 URL: https://issues.apache.org/jira/browse/FLINK-14942
 Project: Flink
  Issue Type: Improvement
  Components: API / State Processor
Affects Versions: 1.9.1
Reporter: Jun Qin


Current when a new savepoint is created based on a source savepoint, then there 
are references in the new savepoint to the source savepoint. Here is the [State 
Processing API 
doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
 says: 
bq. Note: When basing a new savepoint on existing state, the state processor 
api makes a shallow copy of the pointers to the existing operators. This means 
that both savepoints share state and one cannot be deleted without corrupting 
the other!

This JIRA is to request an option to have a deep copy (instead of shallow copy) 
such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14890) TestHarness for KeyedBroadcastProcessFunction

2019-11-21 Thread Jun Qin (Jira)
Jun Qin created FLINK-14890:
---

 Summary: TestHarness for KeyedBroadcastProcessFunction
 Key: FLINK-14890
 URL: https://issues.apache.org/jira/browse/FLINK-14890
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.9.1
Reporter: Jun Qin


To test {{KeyedCoProcessFunction}}, one can use {{KeyedCoProcessOperator}} and 
{{KeyedTwoInputStreamOperatorTestHarness}}, to test 
{{KeyedBroadcastProcessFunction}}, I see {{CoBroadcastWithKeyedOperator}}, but 
the TestHarness class is missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14054) Enable checkpointing via job configuration

2019-09-11 Thread Jun Qin (Jira)
Jun Qin created FLINK-14054:
---

 Summary: Enable checkpointing via job configuration
 Key: FLINK-14054
 URL: https://issues.apache.org/jira/browse/FLINK-14054
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Jun Qin


Currently enabling checkpointing can only be done via the job code, see the 
following quote from this Flink 
[checkpointing|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing]
 doc:
{quote}By default, checkpointing is disabled. To enable checkpointing, call 
{{enableCheckpointing(n)}} on the {{StreamExecutionEnvironment}}, where _n_ is 
the checkpoint interval in milliseconds.
{quote}
This makes enabling checkingpointing after the job code has been released 
difficult: one has to change and rebuild the job code.

In addition, not only for developer, making checkpointing enabling configurable 
is also of interest for operation teams:
 * They may want to enable checkpointing for production but disable in test 
(e.g., to save storage space)
 * They may want to try out with and without checkpointing to evaluate the 
impact to the job behaviour and performance.  

Therefore, this request.  Thanks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)