My sql is regular insert like “insert into sink_table select c1,c2,c3 from 
source_table”, 
I want to know which case it will judge to append only? Does it has doc for 
this?

Many thanks!





> 在 2019年11月14日,上午10:05,张万新 <kevinzwx1...@gmail.com> 写道:
> 
> Yes it's related to your sql, flink checks the plan of your sql to judge 
> whether your job is append only or has updates. If your job is append only, 
> that means no result need to be updated.
> 
> If you still have problems, please post your sql and complete error message 
> to help people understand your use case.
> 
> Polarisary <polaris...@gmail.com <mailto:polaris...@gmail.com>> 
> 于2019年11月13日周三 下午6:43写道:
> Hi
> When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly 
> is modified to ture, and keyFields is modified to null by StreamExecSink, but 
> i want to upsert,
> Does this  related to sql?
> 
> the stack as follows:
> at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>       at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)
>       at task.Device.main(Device.java:77)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> 
> Hope to reply!
> many thanks
> 

Reply via email to