Hi Team,

Can you please help me with the below issue?

Thanks,
Asha Desu

> On Feb 4, 2021, at 1:34 PM, asha jyothi <[email protected]> wrote:
> 
> Hi Team,
> 
> I am new to flink and iceberg. I am trying to add rows to an existing iceberg 
> table from a flink job with flink sql. I'm able to do that using flink sql in 
> command line, but when I try it in java, I'm getting some errors. Its looking 
> for connector and I couldnt figure out the iceberg connector details that I 
> can pass in from java code from the documentation. Can you please help me 
> with this?
> Below is my code and the error that I'm getting. Thanks in advance.
> 
> 
>       TableEnvironment tEnv = TableEnvironment
>                               
> .create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().
>                                               
> withBuiltInCatalogName("flink_hadoop_catalog").withBuiltInDatabaseName("flink_iceberg_db").build());
>       try {
>               exec(tEnv, "CREATE TABLE 
> flink_hadoop_catalog.flink_iceberg_db.transactions%d (AccountId BIGINT 
> ,Timestamp1 BIGINT,Amount DOUBLE)", value.getLong(0));
>       }
>               catch(AlreadyExistsException e) {
>               // do nothing
>               }
> 
>       String query = "INSERT INTO %s VALUES (%d,%d,%f)";
>       TableResult tr = exec(tEnv, query, 
> "flink_hadoop_catalog.flink_iceberg_db.transactions"+value.getLong(0), 
> value.getLong(0), value.getLong(1), value.getDouble(2));
>       
> 
> 
> Caused by: org.apache.flink.table.api.ValidationException: Unable to create a 
> sink for writing table 'flink_hadoop_catalog.flink_iceberg_db.transactions1'.
> 
> Table options are:
>       at 
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:349)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
>       at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>       at scala.collection.Iterator.foreach(Iterator.scala:937)
>       at scala.collection.Iterator.foreach$(Iterator.scala:937)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>       at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>       at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>       at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691)
>       at 
> spendreport.TransactionSinkFunction2.exec(TransactionSinkFunction2.java:123)
>       at 
> spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:102)
>       at 
> spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:55)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>       at spendreport.FraudDetector.processElement(FraudDetector.java:136)
>       at spendreport.FraudDetector.processElement(FraudDetector.java:45)
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.ValidationException: Table options do 
> not contain an option key 'connector' for discovering a connector.
>       at 
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>       at 
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>       ... 42 more
> 
> Thanks,
> Asha Desu

Reply via email to