This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e746b83 More refactoring
e746b83 is described below
commit e746b836241d4ed096f70144a52d830ab56c3fb5
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Feb 26 12:02:30 2025 +0100
More refactoring
---
CHANGELOG.md | 4 ++
src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs | 52 ++++++++++------------
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 8 ++--
tests/DotPulsar.Tests/Internal/ProducerTests.cs | 6 +--
.../Schemas/AvroISpecificRecordSchemaTests.cs | 16 +++----
.../{AvroBlankSampleModel.cs => EmptyModel.cs} | 2 +-
...ngSCHEMAField.cs => InvalidSchemaFieldModel.cs} | 2 +-
.../{AvroSampleModel.cs => ValidModel.cs} | 8 ++--
8 files changed, 49 insertions(+), 49 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4a58920..f423d3b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,10 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased]
+### Added
+
+- Support for [Apache.Avro](https://www.nuget.org/packages/Apache.Avro)
+
### Changed
- Updated the Microsoft.Extensions.ObjectPool dependency from version 9.0.1 to
9.0.2
diff --git a/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
b/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
index 749e544..5e7258b 100644
--- a/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
+++ b/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
@@ -24,6 +24,8 @@ using System.Text;
public sealed class AvroISpecificRecordSchema<T> : ISchema<T>
{
private const string SchemaField = "_SCHEMA";
+ private const string AvroSchemaFullName = "Avro.Schema";
+ private const string AvroISpecificRecordFullName =
"Avro.Specific.ISpecificRecord";
private static readonly Type _typeT;
private static readonly object _avroSchema;
@@ -45,27 +47,19 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
static AvroISpecificRecordSchema()
#pragma warning restore CS8618 // Supressed because if there is an init error
the non-static constructor will throw it instead. This is done in case of if
there is a wrong implementation of ISpecificRecord in T in order not to stop
the whole runtime.
{
- const string schemaFullName = "Avro.Schema";
- const string ISpecificRecordFullName = "Avro.Specific.ISpecificRecord";
_typeT = typeof(T);
- string SchemaName;
- string SchemaData;
try
{
- var assembly = Assembly.Load("Avro");
+ if (!_typeT.GetInterfaces().Any(i => i.FullName ==
AvroISpecificRecordFullName))
+ throw new SchemaException($"The type '{_typeT}' must implement
'{AvroISpecificRecordFullName}'");
- if (!_typeT.GetInterfaces().Any(i => i.FullName ==
ISpecificRecordFullName))
- throw new SchemaException($"The type {_typeT} must implement
{ISpecificRecordFullName}");
-
- _avroSchema = _typeT.GetField(SchemaField)?.GetValue(null) ??
throw new SchemaException($"The static field '{SchemaField}' must not be null
in type: {_typeT}");
+ _avroSchema = _typeT.GetField(SchemaField)?.GetValue(null) ??
throw new SchemaException($"The static field '{SchemaField}' must not be null
in type '{_typeT}'");
var avroSchemaType = _avroSchema.GetType();
- if (!avroSchemaType.ImplementsBaseTypeFullName(schemaFullName))
- throw new SchemaException($"The static field '{SchemaField}'
must be of type {schemaFullName}");
+ if (!avroSchemaType.ImplementsBaseTypeFullName(AvroSchemaFullName))
+ throw new SchemaException($"The static field '{SchemaField}'
must be of type '{AvroSchemaFullName}'");
- SchemaName = (string)
(avroSchemaType.GetProperty("Name")?.GetValue(_avroSchema) ?? string.Empty);
- SchemaData = (string) (avroSchemaType.GetMethod("ToString",
Type.EmptyTypes)?.Invoke(_avroSchema, null) ?? throw new
SchemaException($"Schema ToString() must not return null for type {_typeT}"));
TryLoadStatic(out Type avroWriterType, out Type avroReaderType,
out TypeInfo binaryEncoderType, out TypeInfo binaryDecoderType, out MethodInfo
avroWriterMethod, out MethodInfo avroReaderMethod);
_avroWriterTypeInfo = avroWriterType;
_avroReaderTypeInfo = avroReaderType;
@@ -73,7 +67,10 @@ public sealed class AvroISpecificRecordSchema<T> : ISchema<T>
_binaryDecoderTypeInfo = binaryDecoderType;
_avroWriterWriteMethod = avroWriterMethod;
_avroReaderReadMethod = avroReaderMethod;
- _schemaInfo = new SchemaInfo(SchemaName,
Encoding.UTF8.GetBytes(SchemaData), SchemaType.Avro, new Dictionary<string,
string>());
+
+ var schemaName = (string)
(avroSchemaType.GetProperty("Name")?.GetValue(_avroSchema) ?? string.Empty);
+ var schemaData = (string) (avroSchemaType.GetMethod("ToString",
Type.EmptyTypes)?.Invoke(_avroSchema, null) ?? throw new
SchemaException($"Schema 'ToString()' must not return null for type
'{_typeT}'"));
+ _schemaInfo = new SchemaInfo(schemaName,
Encoding.UTF8.GetBytes(schemaData), SchemaType.Avro, new Dictionary<string,
string>());
}
catch (Exception exception)
{
@@ -83,8 +80,9 @@ public sealed class AvroISpecificRecordSchema<T> : ISchema<T>
public AvroISpecificRecordSchema()
{
- if (_constructorException != null)
+ if (_constructorException is not null)
throw _constructorException;
+
TryLoad(out object avroWriter, out object avroReader);
_avroWriter = avroWriter;
_avroReader = avroReader;
@@ -94,7 +92,7 @@ public sealed class AvroISpecificRecordSchema<T> : ISchema<T>
{
using var stream = new MemoryStream(bytes.ToArray());
T? def = default;
- return (T) (_avroReaderReadMethod.Invoke(_avroReader, [def,
GetBinaryDecoder(stream)]) ?? throw new SchemaSerializationException($"Could
not deserialize object of type {_typeT}"));
+ return (T) (_avroReaderReadMethod.Invoke(_avroReader, [def,
GetBinaryDecoder(stream)]) ?? throw new SchemaSerializationException($"Could
not deserialize object of type '{_typeT}'"));
}
public ReadOnlySequence<byte> Encode(T message)
@@ -104,7 +102,8 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
return new ReadOnlySequence<byte>(stream.ToArray());
}
- private static void TryLoadStatic(out Type avroWriter,
+ private static void TryLoadStatic(
+ out Type avroWriter,
out Type avroReader,
out TypeInfo binaryEncoderType,
out TypeInfo binaryDecoderType,
@@ -137,7 +136,7 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
return type.MakeGenericType(typeof(T));
}
- throw new SchemaException($"{fullName} as a generic public class was
not found");
+ throw new SchemaException($"'{fullName}' as a generic public class was
not found");
}
private static Type LoadSpecificDatumReaderType(IEnumerable<TypeInfo>
types)
@@ -150,7 +149,7 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
return type.MakeGenericType(typeof(T));
}
- throw new SchemaException($"{fullName} as a generic public class was
not found");
+ throw new SchemaException($"'{fullName}' as a generic public class was
not found");
}
private static object LoadSpecificDatumWriter()
@@ -174,11 +173,10 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
continue;
var param1Fullname = parameters[1].ParameterType.FullName;
- if (param1Fullname == null)
+ if (param1Fullname is null)
continue;
- if (parameters[0].ParameterType != typeof(T) ||
- !param1Fullname.Equals(secondParamFullname))
+ if (parameters[0].ParameterType != typeof(T) ||
!param1Fullname.Equals(secondParamFullname))
continue;
return method;
@@ -202,11 +200,10 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
continue;
var param1Fullname = parameters[1].ParameterType.FullName;
- if (param1Fullname == null)
+ if (param1Fullname is null)
continue;
- if (parameters[0].ParameterType != typeof(T) ||
- !param1Fullname.Equals(secondParamFullname))
+ if (parameters[0].ParameterType != typeof(T) ||
!param1Fullname.Equals(secondParamFullname))
continue;
return method;
@@ -225,7 +222,7 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
return type;
}
- throw new SchemaException($"{fullName} as a public class was not
found");
+ throw new SchemaException($"'{fullName}' as a public class was not
found");
}
private static TypeInfo LoadBinaryDecoderType(IEnumerable<TypeInfo> types)
@@ -238,7 +235,7 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
return type;
}
- throw new SchemaException($"{fullName} as a public class was not
found");
+ throw new SchemaException($"'{fullName}' as a public class was not
found");
}
private static object GetBinaryEncoder(MemoryStream stream)
@@ -246,5 +243,4 @@ public sealed class AvroISpecificRecordSchema<T> :
ISchema<T>
private static object GetBinaryDecoder(MemoryStream stream)
=> Activator.CreateInstance(_binaryDecoderTypeInfo, stream) ?? throw
new SchemaException("There was a problem while instantiating BinaryDecoder");
-
}
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index 98a23c9..1c98fcb 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -282,12 +282,12 @@ public sealed class ConsumerTests : IDisposable
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
- var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ var pulsarSchema = Schema.AvroISpecificRecord<ValidModel>();
await _fixture.AddSchemaToExistingTopic(topicName,
pulsarSchema.SchemaInfo, _cts.Token);
var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName,
Schema.String);
await using var producer = CreateProducer(client, topicName,
pulsarSchema);
- await producer.Send(new AvroSampleModel(), _cts.Token);
+ await producer.Send(new ValidModel(), _cts.Token);
//Act
var exception = await Record.ExceptionAsync(consumer.Receive().AsTask);
@@ -301,12 +301,12 @@ public sealed class ConsumerTests : IDisposable
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
- var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ var pulsarSchema = Schema.AvroISpecificRecord<ValidModel>();
await _fixture.AddSchemaToExistingTopic(topicName,
pulsarSchema.SchemaInfo, _cts.Token);
var client = CreateClient();
await using var consumer = CreateConsumer(client, topicName,
pulsarSchema);
await using var producer = CreateProducer(client, topicName,
pulsarSchema);
- var expected = new AvroSampleModel();
+ var expected = new ValidModel();
await producer.Send(expected, _cts.Token);
//Act
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index ed6592b..581d30a 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -275,7 +275,7 @@ public sealed class ProducerTests : IDisposable
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
- var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ var pulsarSchema = Schema.AvroISpecificRecord<ValidModel>();
await _fixture.AddSchemaToExistingTopic(topicName,
pulsarSchema.SchemaInfo, _cts.Token);
var client = CreateClient();
await using var producer = CreateProducer(client, topicName,
Schema.ByteSequence);
@@ -292,13 +292,13 @@ public sealed class ProducerTests : IDisposable
{
//Arrange
var topicName = await _fixture.CreateTopic(_cts.Token);
- var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ var pulsarSchema = Schema.AvroISpecificRecord<ValidModel>();
await _fixture.AddSchemaToExistingTopic(topicName,
pulsarSchema.SchemaInfo, _cts.Token);
var client = CreateClient();
await using var producer = CreateProducer(client, topicName,
pulsarSchema);
//Act
- var exception = await Record.ExceptionAsync(producer.Send(new
AvroSampleModel(), _cts.Token).AsTask);
+ var exception = await Record.ExceptionAsync(producer.Send(new
ValidModel(), _cts.Token).AsTask);
//Assert
exception.ShouldBeNull();
diff --git a/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
b/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
index dd188b0..1573db5 100644
--- a/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
+++ b/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
@@ -24,21 +24,21 @@ public sealed class AvroISpecificRecordSchemaTests
[Fact]
public void
Constructor_GivenModelThatDoNotImplementISpecificRecord_ShouldThrowException()
{
- var exception =
Record.Exception(Schema.AvroISpecificRecord<AvroBlankSampleModel>);
+ var exception =
Record.Exception(Schema.AvroISpecificRecord<EmptyModel>);
exception.ShouldBeOfType<SchemaException>();
}
[Fact]
- public void
Constructor_GivenModelWithWrongSchemaField_ShouldThrowException()
+ public void
Constructor_GivenModelWithInvalidSchemaField_ShouldThrowException()
{
- var exception =
Record.Exception(Schema.AvroISpecificRecord<AvroSampleModelWithWrongSchemaField>);
+ var exception =
Record.Exception(Schema.AvroISpecificRecord<InvalidSchemaFieldModel>);
exception.ShouldBeOfType<SchemaException>();
}
[Fact]
public void Constructor_GivenValidModel_ShouldReturnSchema()
{
- var exception =
Record.Exception(Schema.AvroISpecificRecord<AvroSampleModel>);
+ var exception =
Record.Exception(Schema.AvroISpecificRecord<ValidModel>);
exception.ShouldBeNull();
}
@@ -46,9 +46,9 @@ public sealed class AvroISpecificRecordSchemaTests
public void Encode_GivenValidModel_ShouldReturnCorrectBytes()
{
//Arrange
- var schema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ var schema = Schema.AvroISpecificRecord<ValidModel>();
var expected = new ReadOnlySequence<byte>([16, 77, 97, 114, 105, 103,
111, 110, 97, 8, 67, 101, 116, 97, 58]);
- var model = new AvroSampleModel { Name = "Marigona", Surname = "Ceta",
Age = 29 };
+ var model = new ValidModel { Name = "Marigona", Surname = "Ceta", Age
= 29 };
//Act
var actual = schema.Encode(model);
@@ -61,9 +61,9 @@ public sealed class AvroISpecificRecordSchemaTests
public void Decode_GivenValidBytes_ShouldReturnCorrectModel()
{
//Arrange
- var schema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ var schema = Schema.AvroISpecificRecord<ValidModel>();
var bytes = new ReadOnlySequence<byte>([16, 77, 97, 114, 105, 103,
111, 110, 97, 8, 67, 101, 116, 97, 58]);
- var expected = new AvroSampleModel { Name = "Marigona", Surname =
"Ceta", Age = 29 };
+ var expected = new ValidModel { Name = "Marigona", Surname = "Ceta",
Age = 29 };
//Act
var actual = schema.Decode(bytes);
diff --git
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/EmptyModel.cs
similarity index 94%
rename from
tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs
rename to tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/EmptyModel.cs
index 65c54b0..c5211ba 100644
---
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs
+++ b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/EmptyModel.cs
@@ -14,4 +14,4 @@
namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
-public class AvroBlankSampleModel { }
+public class EmptyModel { }
diff --git
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/InvalidSchemaFieldModel.cs
similarity index 93%
rename from
tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
rename to
tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/InvalidSchemaFieldModel.cs
index e4e9f19..13ce889 100644
---
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
+++
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/InvalidSchemaFieldModel.cs
@@ -16,7 +16,7 @@ namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
using Avro.Specific;
-public class AvroSampleModelWithWrongSchemaField : ISpecificRecord
+public class InvalidSchemaFieldModel : ISpecificRecord
{
public static string _SCHEMA = "WRONG!";
diff --git
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/ValidModel.cs
similarity index 93%
rename from
tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs
rename to tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/ValidModel.cs
index 5b7bb57..183b9f2 100644
--- a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs
+++ b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/ValidModel.cs
@@ -16,12 +16,12 @@ namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
using Avro.Specific;
-public class AvroSampleModel : ISpecificRecord
+public class ValidModel : ISpecificRecord
{
public static readonly Avro.Schema _SCHEMA = Avro.Schema.Parse(@"
{
""type"": ""record"",
- ""name"": ""AvroSampleModel"",
+ ""name"": ""ValidModel"",
""fields"": [
{ ""name"": ""Name"", ""type"": ""string"" },
{ ""name"": ""Surname"", ""type"": ""string"" },
@@ -29,7 +29,7 @@ public class AvroSampleModel : ISpecificRecord
]
}");
- public AvroSampleModel()
+ public ValidModel()
{
Name = "Jon";
Surname = "Klinaku";
@@ -63,7 +63,7 @@ public class AvroSampleModel : ISpecificRecord
Name = value as string ?? throw new ArgumentException("Name
must be a string");
break;
case 1:
- Surname = value as string ?? throw new ArgumentException("Name
must be a string");
+ Surname = value as string ?? throw new
ArgumentException("Surname must be a string");
break;
case 2:
Age = value is int intValue ? intValue : throw new
ArgumentException("Age must be an int");