Hi Team, Can you please help me with the below issue?
Thanks, Asha Desu > On Feb 4, 2021, at 1:34 PM, asha jyothi <[email protected]> wrote: > > Hi Team, > > I am new to flink and iceberg. I am trying to add rows to an existing iceberg > table from a flink job with flink sql. I'm able to do that using flink sql in > command line, but when I try it in java, I'm getting some errors. Its looking > for connector and I couldnt figure out the iceberg connector details that I > can pass in from java code from the documentation. Can you please help me > with this? > Below is my code and the error that I'm getting. Thanks in advance. > > > TableEnvironment tEnv = TableEnvironment > > .create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode(). > > withBuiltInCatalogName("flink_hadoop_catalog").withBuiltInDatabaseName("flink_iceberg_db").build()); > try { > exec(tEnv, "CREATE TABLE > flink_hadoop_catalog.flink_iceberg_db.transactions%d (AccountId BIGINT > ,Timestamp1 BIGINT,Amount DOUBLE)", value.getLong(0)); > } > catch(AlreadyExistsException e) { > // do nothing > } > > String query = "INSERT INTO %s VALUES (%d,%d,%f)"; > TableResult tr = exec(tEnv, query, > "flink_hadoop_catalog.flink_iceberg_db.transactions"+value.getLong(0), > value.getLong(0), value.getLong(1), value.getDouble(2)); > > > > Caused by: org.apache.flink.table.api.ValidationException: Unable to create a > sink for writing table 'flink_hadoop_catalog.flink_iceberg_db.transactions1'. > > Table options are: > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:349) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691) > at > spendreport.TransactionSinkFunction2.exec(TransactionSinkFunction2.java:123) > at > spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:102) > at > spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:55) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > at spendreport.FraudDetector.processElement(FraudDetector.java:136) > at spendreport.FraudDetector.processElement(FraudDetector.java:45) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.ValidationException: Table options do > not contain an option key 'connector' for discovering a connector. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ... 42 more > > Thanks, > Asha Desu
