RE: Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-21 Thread B.Zhou
Hi Fabian,

Thanks for the reply. I also created a JIRA: 
https://issues.apache.org/jira/browse/FLINK-18641 yesterday. I think we can 
extend our discussion there.

Best Regards,
Brian

From: Fabian Hueske 
Sent: Tuesday, July 21, 2020 17:35
To: Zhou, Brian
Cc: user; Arvid Heise; Piotr Nowojski
Subject: Re: Pravega connector cannot recover from the checkpoint due to 
"Failure to finalize checkpoint"


[EXTERNAL EMAIL]
Hi Brian,

AFAIK, Arvid and Piotr (both in CC) have been working on the threading model of 
the checkpoint coordinator.
Maybe they can help with this question.

Best, Fabian

Am Mo., 20. Juli 2020 um 03:36 Uhr schrieb 
mailto:b.z...@dell.com>>:
Anyone can help us on this issue?

Best Regards,
Brian

From: Zhou, Brian
Sent: Wednesday, July 15, 2020 18:26
To: 'user@flink.apache.org'
Subject: Pravega connector cannot recover from the checkpoint due to "Failure 
to finalize checkpoint"

Hi community,
To give some background, https://github.com/pravega/flink-connectors is a 
Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the Flink 
`MasterTriggerRestoreHook` interface to trigger the Pravega checkpoint during 
Flink checkpoints to make sure the data recovery. We experienced the failures 
in the latest Flink 1.11 upgrade with the checkpoint recovery, there are some 
timeout issues for the continuous checkpoint failure on some of the test cases.
Error stacktrace:
2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
acknowledgement message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the 
pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
not been fully acknowledged yet
 at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
 at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
 ... 9 common frames omitted
After some investigation, the main problem is found. It is about the checkpoint 
recovery. When Flink CheckpointCoordinator wants to finalize a checkpoint, it 
needs to check everything is acknowledged, but for some reason, the master 
state still has our ReaderCheckpointHook remaining unack-ed, hence leading the 
checkpoint failure in the complete stage.
In the PendingCheckpoint::snapshotMasterState, there is an async call to 
acknowledge the master state for each hook. But it returned before the 
acknowledgement.
I think it might be related to the latest changes of the thread model of the 
checkpoint coordinator. Can someone help to verify?

Reproduce procedure:
Checkout this branch 
https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and run 
below test case: FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint

[1] 
https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java

Best Regards,
Brian



RE: Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-19 Thread B.Zhou
Anyone can help us on this issue?

Best Regards,
Brian

From: Zhou, Brian
Sent: Wednesday, July 15, 2020 18:26
To: 'user@flink.apache.org'
Subject: Pravega connector cannot recover from the checkpoint due to "Failure 
to finalize checkpoint"

Hi community,
To give some background, https://github.com/pravega/flink-connectors is a 
Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the Flink 
`MasterTriggerRestoreHook` interface to trigger the Pravega checkpoint during 
Flink checkpoints to make sure the data recovery. We experienced the failures 
in the latest Flink 1.11 upgrade with the checkpoint recovery, there are some 
timeout issues for the continuous checkpoint failure on some of the test cases.
Error stacktrace:
2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
acknowledgement message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the 
pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
not been fully acknowledged yet
 at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
 at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
 ... 9 common frames omitted
After some investigation, the main problem is found. It is about the checkpoint 
recovery. When Flink CheckpointCoordinator wants to finalize a checkpoint, it 
needs to check everything is acknowledged, but for some reason, the master 
state still has our ReaderCheckpointHook remaining unack-ed, hence leading the 
checkpoint failure in the complete stage.
In the PendingCheckpoint::snapshotMasterState, there is an async call to 
acknowledge the master state for each hook. But it returned before the 
acknowledgement.
I think it might be related to the latest changes of the thread model of the 
checkpoint coordinator. Can someone help to verify?

Reproduce procedure:
Checkout this branch 
https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and run 
below test case: FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint

[1] 
https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java

Best Regards,
Brian



Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-15 Thread B.Zhou
Hi community,
To give some background, https://github.com/pravega/flink-connectors is a 
Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the Flink 
`MasterTriggerRestoreHook` interface to trigger the Pravega checkpoint during 
Flink checkpoints to make sure the data recovery. We experienced the failures 
in the latest Flink 1.11 upgrade with the checkpoint recovery, there are some 
timeout issues for the continuous checkpoint failure on some of the test cases.
Error stacktrace:
2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
acknowledgement message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the 
pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
not been fully acknowledged yet
 at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
 at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
 ... 9 common frames omitted
After some investigation, the main problem is found. It is about the checkpoint 
recovery. When Flink CheckpointCoordinator wants to finalize a checkpoint, it 
needs to check everything is acknowledged, but for some reason, the master 
state still has our ReaderCheckpointHook remaining unack-ed, hence leading the 
checkpoint failure in the complete stage.
In the PendingCheckpoint::snapshotMasterState, there is an async call to 
acknowledge the master state for each hook. But it returned before the 
acknowledgement.
I think it might be related to the latest changes of the thread model of the 
checkpoint coordinator. Can someone help to verify?

Reproduce procedure:
Checkout this branch 
https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and run 
below test case: FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint

[1] 
https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java

Best Regards,
Brian



Ask for reason for choice of S3 plugins

2020-03-27 Thread B.Zhou
Hi,

In this document 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins,
 it mentioned that

  *   Presto is the recommended file system for checkpointing to S3.
Is there a reason for that? Is there some bottleneck for s3 hadoop plugin that 
can't support checkpoint storage well?

And if I have the s3:// scheme with both plugins loaded, is there a class 
loading order or just random for accessing S3? Which plugin will take charge?

Best Regards,
Brian



RE: Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-24 Thread B.Zhou
Hi,

Thanks for the information. I replied in the comment of this issue: 
https://issues.apache.org/jira/browse/FLINK-16693?focusedCommentId=17065486=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17065486
 

Best Regards,
Brian

-Original Message-
From: Timo Walther  
Sent: Tuesday, March 24, 2020 16:40
To: Zhou, Brian; imj...@gmail.com
Cc: user@flink.apache.org
Subject: Re: Need help on timestamp type conversion for Table API on Pravega 
Connector


[EXTERNAL EMAIL] 

This issue is tracked under:

https://issues.apache.org/jira/browse/FLINK-16693

Could you provide us a little reproducible example in the issue? I think that 
could help us in resolving this issue quickly in the next minor release.

Thanks,
Timo


On 20.03.20 03:28, b.z...@dell.com wrote:
> Hi,
> 
> Thanks for the reference, Jark. In Pravega connector, user will define 
> Schema first and then create the table with the descriptor using the 
> schema, see [1] and error also came from this test case. We also tried 
> the recommended `bridgedTo(Timestamp.class)` method in the schema 
> construction, it came with the same error stack trace.
> 
> We are also considering switching to Blink planner implementation, do 
> you think we can get this issue fixed with the change?
> 
> Here is the full stacktrace:
> 
> ```
> 
> org.apache.flink.table.codegen.CodeGenException: Unsupported cast from 
> 'LocalDateTime' to 'Long'.
> 
>     at
> org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(Sca
> larOperators.scala:815)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:941)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:66)
> 
>     at 
> org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(Code
> Generator.scala:752)
> 
>     at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:
> 233)
> 
>     at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:5
> 8)
> 
>     at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:
> 51)
> 
>     at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 
>     at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
> 
>     at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> 
>     at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:742)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.s
> cala:66)
> 
>     at 
> org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGe
> nerator.scala:247)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverte
> rResultExpression$1(CodeGenerator.scala:273)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverte
> rResultExpression$1$adapted(CodeGenerator.scala:269)
> 
>     at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:
> 233)
> 
>     at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala
> :32)
> 
>     at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scal
> a:29)
> 
>     at
> scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
> 
>     at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
> 
>     at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> 
>     at
> scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
> 
>     at
> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultEx
> pression(CodeGenerator.scala:269)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversion
> Mapper(BatchScan.scala:95)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalR
> ow(BatchScan.scala:59)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalR
> ow$(BatchScan.scala:35)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convert
> ToInternalRow(BatchTableSourceScan.scala:45)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.transla
> teToPlan(BatchTableSourceScan.scala:165)
> 
>     at
> org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.trans
> lateToPlan(DataSetWindowAggregate.scala:114)
> 
>     

RE: Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-19 Thread B.Zhou
Hi,

Thanks for the reference, Jark. In Pravega connector, user will define Schema 
first and then create the table with the descriptor using the schema, see [1] 
and error also came from this test case. We also tried the recommended 
`bridgedTo(Timestamp.class)` method in the schema construction, it came with 
the same error stack trace.
We are also considering switching to Blink planner implementation, do you think 
we can get this issue fixed with the change?

Here is the full stacktrace:

```
org.apache.flink.table.codegen.CodeGenException: Unsupported cast from 
'LocalDateTime' to 'Long'.

   at 
org.apache.flink.table.codegen.calls.ScalarOperators$.generateCast(ScalarOperators.scala:815)
   at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:941)
   at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
   at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$visitCall$1(CodeGenerator.scala:752)
   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
   at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
   at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
   at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
   at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
   at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)
   at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:742)
   at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
   at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
   at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1(CodeGenerator.scala:273)
   at 
org.apache.flink.table.codegen.CodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(CodeGenerator.scala:269)
   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
   at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
   at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
   at 
scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
   at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)
   at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
   at 
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
   at 
org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:269)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.generateConversionMapper(BatchScan.scala:95)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow(BatchScan.scala:59)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToInternalRow$(BatchScan.scala:35)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.convertToInternalRow(BatchTableSourceScan.scala:45)
   at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:165)
   at 
org.apache.flink.table.plan.nodes.dataset.DataSetWindowAggregate.translateToPlan(DataSetWindowAggregate.scala:114)
   at 
org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:92)
   at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
   at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
   at 
org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:87)
   at 
io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceBatchDescriptor(FlinkPravegaTableITCase.java:349)
   at 
io.pravega.connectors.flink.FlinkPravegaTableITCase.testTableSourceUsingDescriptor(FlinkPravegaTableITCase.java:246)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at 

RE: SQL Timetamp types incompatible after migration to 1.10

2020-03-19 Thread B.Zhou
Hi Jark,

I saw this mail and found this is a similar issue I raised to the community 
several days ago.[1] Can you have a look to see if it’s the same issue as this.

If yes, there is a further question. From the Pravega connector side, the issue 
is raised in our Batch Table API which means users using the 
BatchTableEnvironment to create tables. Currently, BatchTableEnvironment does 
not support Blink planner. Any suggestions on how we can support Batch Tables 
in Flink 1.10?

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Need-help-on-timestamp-type-conversion-for-Table-API-on-Pravega-Connector-td33660.html

Best Regards,
Brian

From: Jark Wu 
Sent: Thursday, March 19, 2020 17:14
To: Paul Lam
Cc: user
Subject: Re: SQL Timetamp types incompatible after migration to 1.10


[EXTERNAL EMAIL]
Hi Paul,

Are you using old planner? Did you try blink planner? I guess it maybe a bug in 
old planner which doesn't work well on new types.

Best,
Jark

On Thu, 19 Mar 2020 at 16:27, Paul Lam 
mailto:paullin3...@gmail.com>> wrote:
Hi,

Recently I upgraded a simple application that inserts static data into a table 
from 1.9.0 to 1.10.0, and
encountered a timestamp type incompatibility problem during the table sink 
validation.

The SQL is like:
```
insert into kafka.test.tbl_a # schema: (user_name STRING, user_id INT, 
login_time TIMESTAMP)
select ("ann", 1000, TIMESTAMP "2019-12-30 00:00:00")
```

And the error thrown:
```
Field types of query result and registered TableSink `kafka`.`test`.`tbl_a` do 
not match.
Query result schema: [EXPR$0: String, EXPR$1: Integer, EXPR$2: Timestamp]
TableSink schema: [user_name: String, user_id: Integer, login_time: 
LocalDateTime]
```

After some digging, I found the root cause might be that since FLINK-14645 
timestamp fields
defined via TableFactory had been bridged to LocalDateTime, but timestamp 
literals are
still backed by java.sql.Timestamp.

Is my reasoning correct? And is there any workaround? Thanks a lot!

Best,
Paul Lam



Need help on timestamp type conversion for Table API on Pravega Connector

2020-03-15 Thread B.Zhou
Hi community,

Pravega connector is a connector that provides both Batch and Streaming Table 
API implementation. We uses descriptor API to build Table source. When we plan 
to upgrade to Flink 1.10, we found the unit tests are not passing with our 
existing Batch Table API. There is a type conversion error in the Timestamp 
with our descriptor Table API. The detail is in the issue here: 
https://github.com/pravega/flink-connectors/issues/341 Hope someone from Flink 
community can help us with some suggestions on this issue. Thanks.

Best Regards,
Brian