My sql is regular insert like “insert into sink_table select c1,c2,c3 from source_table”, I want to know which case it will judge to append only? Does it has doc for this?
Many thanks! > 在 2019年11月14日,上午10:05,张万新 <kevinzwx1...@gmail.com> 写道: > > Yes it's related to your sql, flink checks the plan of your sql to judge > whether your job is append only or has updates. If your job is append only, > that means no result need to be updated. > > If you still have problems, please post your sql and complete error message > to help people understand your use case. > > Polarisary <polaris...@gmail.com <mailto:polaris...@gmail.com>> > 于2019年11月13日周三 下午6:43写道: > Hi > When I use flink-jdbc JDBCUpsertTableSink for sink to mysql, the isAppendOnly > is modified to ture, and keyFields is modified to null by StreamExecSink, but > i want to upsert, > Does this related to sql? > > the stack as follows: > at sink.JDBCUpsertTableSink.setIsAppendOnly(JDBCUpsertTableSink.java:105) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:98) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) > at task.Device.main(Device.java:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > > Hope to reply! > many thanks >