Mike Percy has posted comments on this change. Change subject: Add AvroKuduEventProducer to Kudu-Flume integration ......................................................................
Patch Set 4: (7 comments) http://gerrit.cloudera.org:8080/#/c/4034/4/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java File java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduEventProducer.java: Line 60: * <tr><td>producer.schema.path</td> should be schemaPath Line 63: * <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies. Add: If not specified, the schema must be specified on a per-event basis. Line 73: private static final String SCHEMA_HEADER = "schemaPath"; Let's use the same property as the Kite sink for the event headers so that both Flume sinks can operate on the same events and provide the same behavior. That means we should use "flume.avro.schema.url" for the header property. Line 110: String.format("No schema for event! Specify either property %s or event header %s", s/property/configuration property/ Line 121: payloadReader = new DataFileReader<>(new SeekableByteArrayInput(payload), reader); We should not treat each Flume Event as an Avro DataFile. We should be treating each Flume Event as an Avro record. I recommend looking at the Flume DatasetSink implementation and doing the same thing. Here is how they parse the events: https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java#L152 Line 150: if (!col.isNullable()) { I think this should be: if (value == null) { if (col.isNullable()) { row.setNull(name); } else { // leave unset for possible Kudu default } ...actually, it seems like we are missing something from the client API like ColumnSchema.hasDefaultValue() because if we could check that then we could throw if the field is null and a default does not exist. We should probably fix that in a follow-up. Line 202: private DatumReader<GenericRecord> openSchema(String schemaPath) { It would be nice to support both URL and literal. See how the DatasetSink implements this here: https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/parser/AvroParser.java#L175 -- To view, visit http://gerrit.cloudera.org:8080/4034 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I6715df72e447e72f4801a2e026f6840d09b401e1 Gerrit-PatchSet: 4 Gerrit-Project: kudu Gerrit-Branch: master Gerrit-Owner: Will Berkeley <wdberke...@gmail.com> Gerrit-Reviewer: Kudu Jenkins Gerrit-Reviewer: Mike Percy <mpe...@apache.org> Gerrit-Reviewer: Todd Lipcon <t...@apache.org> Gerrit-Reviewer: Will Berkeley <wdberke...@gmail.com> Gerrit-HasComments: Yes