Hi Alexander,
Thanks for reply.
Actually I have a system where data travels in form of user defined, AVRO
schema generated objects.
Sample code:
static void readCsvWithCustomSchemaDecoder(StreamExecutionEnvironment env, Path
dataDirectory) throws Exception {
Class recordClazz = EmployeeTest.class; // This is AVRO generated java
object having fields emp_id and Name
CsvSchema.Builder builder =
CsvSchema.builder().setUseHeader(true).setReorderColumns(true).setColumnSeparator(',').
setEscapeChar('"').setLineSeparator(System.lineSeparator()).setQuoteChar('"').setArrayElementSeparator(";").
setNullValue("");
CsvReaderFormat<Object> csvFormat =
CsvReaderFormat.forSchema(CsvSchema.builder().build(),
TypeInformation.of(recordClazz));
FileSource.FileSourceBuilder fileSourceBuilder =
FileSource.forRecordStreamFormat(csvFormat,
dataDirectory).monitorContinuously(Duration.ofSeconds(30));
fileSourceBuilder.setFileEnumerator((FileEnumerator.Provider) () -> new
NonSplittingRecursiveEnumerator(new DefaultFileFilter()));
FileSource source = fileSourceBuilder.build();
final DataStreamSource<EmployeeTest> file = env.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner(new WatermarkAssigner((Object input) ->
System.currentTimeMillis())),"FileSource");
file.print();
}
Regards,
Kirti Dhar
From: Alexander Fedulov <[email protected]>
Sent: 26 October 2023 20:59
To: Kirti Dhar Upadhyay K <[email protected]>
Cc: [email protected]
Subject: Re: CSV Decoder with AVRO schema generated Object
Hi Kirti,
What do you mean exactly by "Flink CSV Decoder"? Please provide a snippet of
the code that you are trying to execute.
To be honest, combining CSV with AVRO-generated classes sounds rather strange
and you might want to reconsider your approach.
As for a quick fix, using aliases in your reader schema might help [1]
[1] https://avro.apache.org/docs/1.8.1/spec.html#Aliases
Best,
Alexander Fedulov
On Thu, 26 Oct 2023 at 16:24, Kirti Dhar Upadhyay K via user
<[email protected]<mailto:[email protected]>> wrote:
Hi Team,
I am using Flink CSV Decoder with AVSC generated java Object and facing issue
if the field name contains underscore(_) or fieldname starts with Capital case.
Sample Schema:
{
"namespace": "avro.employee",
"type": "record",
"name": "EmployeeTest",
"fields": [
{
"name": "emp_id",
"type": ["null","long"]
},
{
"name": "Name",
"type": ["null","string"]
}
]
}
Generated Java Object getters/setters:
public void setEmpId(java.lang.Long value) {
this.emp_id = value;
}
………………………………………………………………………………………………………….
………………………………………………………………………………………………………….
public java.lang.CharSequence getName() {
return Name;
}
Input record:
emp_id,Name
1,peter
Exception:
Caused by:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "emp_id" (class avro.person.EmployeeTest), not marked as
ignorable (2 known properties: "empId", "name"])
I have also found an old JIRA regarding this:
https://issues.apache.org/jira/browse/FLINK-2874
Any help would be appreciated!
Regards,
Kirti Dhar