Hi, elakiya

If you want to log the deserialize avro record, you should modify the code
in [1].

1.
https://github.com/apache/flink/blob/70e635983dc9aa4bd772042f34a04099256a8750/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java#L81

Best,
Ron



Hang Ruan <ruanhang1...@gmail.com> 于2023年7月20日周四 19:53写道:

> Hi. elaloya.
>
> If you want to log some information about the kafka records, you can add
> some logs in KafkaRecordEmitter.
> If you want to know the information about the deserialized value, you
> should add logs in the avro format.
>
> Best,
> Hang
>
> elakiya udhayanan <laks....@gmail.com> 于2023年7月19日周三 19:44写道:
>
>> Hi Team,
>>
>> I am using the upsert-kafka table API connector of Apache Flink to
>> consume events from a kafka topic, I want to log the kafka payloads that
>> are consumed. Is there a way to log it?
>>
>> My code looks as below:
>>
>> EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance().inStreamingMode().build();
>>         TableEnvironment tEnv = TableEnvironment.create(settings);
>> String statement = "CREATE TABLE Employee (\r\n" +
>> "  employee  ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (employeeId) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" 
>> +
>> ")";
>> tEnv.executeSql(statement);
>>
>> I added log4j.properties to enable log but it did not work. Any help is
>> appreciated.
>>
>

Reply via email to