Hi Team,

I am using the upsert-kafka table API connector of Apache Flink to consume
events from a kafka topic, I want to log the kafka payloads that are
consumed. Is there a way to log it?

My code looks as below:

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
String statement = "CREATE TABLE Employee (\r\n" +
"  employee  ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (employeeId) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";
tEnv.executeSql(statement);

I added log4j.properties to enable log but it did not work. Any help is
appreciated.

Reply via email to