Hi. elaloya. If you want to log some information about the kafka records, you can add some logs in KafkaRecordEmitter. If you want to know the information about the deserialized value, you should add logs in the avro format.
Best, Hang elakiya udhayanan <laks....@gmail.com> 于2023年7月19日周三 19:44写道: > Hi Team, > > I am using the upsert-kafka table API connector of Apache Flink to consume > events from a kafka topic, I want to log the kafka payloads that are > consumed. Is there a way to log it? > > My code looks as below: > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > TableEnvironment tEnv = TableEnvironment.create(settings); > String statement = "CREATE TABLE Employee (\r\n" + > " employee ROW(id STRING, name STRING\r\n" + > " ),\r\n" + > " PRIMARY KEY (employeeId) NOT ENFORCED\r\n" + > ") WITH (\r\n" + > " 'connector' = 'upsert-kafka',\r\n" + > " 'topic' = 'employee',\r\n" + > " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + > " 'key.format' = 'raw',\r\n" + > " 'value.format' = 'avro-confluent',\r\n" + > " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" + > ")"; > tEnv.executeSql(statement); > > I added log4j.properties to enable log but it did not work. Any help is > appreciated. >