I've been trying to write to the avro format with pyflink 1.12.2 on ubuntu, I've tested my code with an iterator writing to csv and everything works as expected. Reading through the flink documentation I see that I should add jar dependencies to work with avro. I downloaded three jar files that I believe are required for avro like so:
table_env\ .get_config()\ .get_configuration()\ .set_string( "pipeline.jars", rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar" ) I suspect I'm not loading the jar files correctly, but it's unclear what I'm supposed to do as I'm not familiar with java and when I switch the sink format to avro I get some unexpected errors: Py4JJavaError: An error occurred while calling o746.executeInsert. : java.lang.NoClassDefFoundError: org/apache/avro/io/DatumWriter at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:71) at org.apache.flink.formats.avro.AvroFileFormatFactory$1.createRuntimeEncoder(AvroFileFormatFactory.java:61) at org.apache.flink.table.filesystem.FileSystemTableSink.createWriter(FileSystemTableSink.java:373) at org.apache.flink.table.filesystem.FileSystemTableSink.createOutputFormatFactory(FileSystemTableSink.java:365) at org.apache.flink.table.filesystem.FileSystemTableSink.createBatchSink(FileSystemTableSink.java:163) at org.apache.flink.table.filesystem.FileSystemTableSink.consume(FileSystemTableSink.java:139) at org.apache.flink.table.filesystem.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:134) at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalSink.createSinkTransformation(CommonPhysicalSink.scala:95) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:87) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.scala:42) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecSink.translateToPlan(BatchExecSink.scala:42) at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:86) at org.apache.flink.table.planner.delegation.BatchPlanner$$anonfun$translateToPlan$1.apply(BatchPlanner.scala:85) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassNotFoundException: org.apache.avro.io.DatumWriter at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) My sample code as follows: from pyflink.dataset import ExecutionEnvironment from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() table_env = BatchTableEnvironment.create(environment_settings=env_settings) table_env\ .get_config()\ .get_configuration()\ .set_string( "pipeline.jars", rf"file:///{os.getcwd()}/lib/flink-sql-avro-1.12.2.jar;file:///{os.getcwd()}/lib/flink-avro-1.12.2.jar;file:///{os.getcwd()}/lib/avro-1.10.2.jar" ) table = table_env.from_elements( a, schema=DataTypes.ROW([ DataTypes.FIELD('text', DataTypes.STRING()), DataTypes.FIELD('text1', DataTypes.STRING()) ]) ) sink_ddl = f""" create table Results( a STRING, b STRING ) with ( 'connector' = 'filesystem', 'path' = '{result_path}', 'format' = 'avro' ) """ table_env.execute_sql(sink_ddl) table.execute_insert("Results").wait() Could someone help or point me in the right direction to look?