Re: Handling Bounded Sources with KafkaSource

2021-03-14 Thread Maciej Obuchowski
Hey Rion,

We solved this issue by using usual, unbounded streams, and using
awaitility library to express conditions that would end the test - for
example, having particular data in a table.

IMO this type of testing has the advantage that you won't have divergent
behavior from production as you have experienced.

Regards,
Maciej



On Sun, Mar 14, 2021, 05:41 Rion Williams  wrote:

> Following up on this issue, I realized my initial problem was that my test
> case only contained a single message to send through the pipeline. This
> resulted in the earliest offset also being the latest and things didn’t
> exactly work as expected. Once I added several other messages and sent them
> through, the pipeline appeared to run as expected.
>
> However, the use of “bounded” seems to be fickle in terms of test cases.
> Since an experience is thrown once the bound is reached, I can typically
> just wrap my test execution within a try/catch and simply apply my
> assertion afterwards.
>
> This occasionally results in passing tests, but in others, it seems that
> the bound is reached prior to processing the messages it had seen thus far,
> and as a result yields a failing test. I don’t know if this is a bug, or
> intentional, but I’m not aware of a workaround that could “force” the
> pipeline to finish processing all of the messages from the topic once the
> bound is reached. I’ve tried sending through “flush records” to the topic,
> however since there are multiple partitions, it’s not guaranteed that the
> pipeline will read those last.
>
> This is purely a testing problem, as a production job would be streaming
> and unbounded, however I’d love to have a reliable integration test or a
> pattern that I could use to guarantee the processing of a finite set of
> data via a KafkaSource (I.e. send finite records to Kafka, read from topic,
> process all records, apply assertion after processing).
>
> Any ideas/recommendations/workarounds would be greatly welcome and I’d be
> happy to share my specific code / use-cases if needed.
>
> Thanks much,
>
> Rion
>
> On Mar 12, 2021, at 10:19 AM, Rion Williams  wrote:
>
> 
> Hi all,
>
> I've been using the KafkaSource API as opposed to the classic consumer and
> things have been going well. I configured my source such that it could be
> used in either a streaming or bounded mode, with the bounded approach
> specifically aimed at improving testing (unit/integration).
>
> I've noticed that when I attempt to run through a test - it seems that the
> pipeline never acknowledges the "end" of the stream in a bounded context
> and just runs forever and never makes it to my assert.
>
> Does anything look glaringly wrong with how the source is being defined?
>
> object KafkaEventSource {
>
> fun withParameters(parameters: ParameterTool): KafkaSource {
> val schemaRegistryUrl = parameters.getRequired("schema.registry.url")
>
> val builder = KafkaSource.builder()
> .setBootstrapServers(parameters.getRequired("bootstrap.servers"))
> .setGroupId(parameters.getRequired("group.id"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setProperty("schema.registry.url", schemaRegistryUrl)
> .setTopics(parameters.getRequired("topic"))
> .setDeserializer(EventDeserializer(schemaRegistryUrl))
>
> if (parameters.getBoolean("bounded", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
>
> return builder.build()
> }
> }
>
> I can verify that the generated source has it's boundedness set properly
> and all of the configuration options are correct.
>
> My test itself is fairly simple and can be broken down as follows:
>
>1. Inject records into a Kafka Topic
>2. Initialize my Flink job using all of my testing parameters
>3. Apply my assertion (in this case verifying that a JdbcSink wrote to
>a specific database)
>
> @Test
> fun `Example `(){
> // Arrange
> val events = getTestEvents()
> sendToKafka(events, parameters)
>
> // Act
> EntityIdentificationJob.run(parameters)
>
> // Assert
> val users = queryCount("SELECT * FROM users", connection)
> assertEquals(1, users)
> }
>
> Where my job itself is broken down further and reads from the source,
> performs a process function into multiple side outputs and writes each of
> them to a distinct JdbcSink based on the type:
>
> @JvmStatic
> fun main(args: Array) {
> val parameters = loadParams(args)
> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>
> // Read from Kafka
> val entities = stream
>.fromSource(KafkaEventSource.withParameters(parameters), 
> WatermarkStrategy.noWatermarks(), "kafka")
>.process(IdentifyEntitiesFunction())
>
> // Write out each tag to its respective sink
> for (entityType in EntityTypes.all) {
> entities
> .getSideOutput(entityType)
> 

Re: Handling Bounded Sources with KafkaSource

2021-03-13 Thread Rion Williams
Following up on this issue, I realized my initial problem was that my test case 
only contained a single message to send through the pipeline. This resulted in 
the earliest offset also being the latest and things didn’t exactly work as 
expected. Once I added several other messages and sent them through, the 
pipeline appeared to run as expected.

However, the use of “bounded” seems to be fickle in terms of test cases. Since 
an experience is thrown once the bound is reached, I can typically just wrap my 
test execution within a try/catch and simply apply my assertion afterwards. 

This occasionally results in passing tests, but in others, it seems that the 
bound is reached prior to processing the messages it had seen thus far, and as 
a result yields a failing test. I don’t know if this is a bug, or intentional, 
but I’m not aware of a workaround that could “force” the pipeline to finish 
processing all of the messages from the topic once the bound is reached. I’ve 
tried sending through “flush records” to the topic, however since there are 
multiple partitions, it’s not guaranteed that the pipeline will read those 
last. 

This is purely a testing problem, as a production job would be streaming and 
unbounded, however I’d love to have a reliable integration test or a pattern 
that I could use to guarantee the processing of a finite set of data via a 
KafkaSource (I.e. send finite records to Kafka, read from topic, process all 
records, apply assertion after processing).

Any ideas/recommendations/workarounds would be greatly welcome and I’d be happy 
to share my specific code / use-cases if needed.

Thanks much,

Rion 

> On Mar 12, 2021, at 10:19 AM, Rion Williams  wrote:
> 
> 
> Hi all,
> 
> I've been using the KafkaSource API as opposed to the classic consumer and 
> things have been going well. I configured my source such that it could be 
> used in either a streaming or bounded mode, with the bounded approach 
> specifically aimed at improving testing (unit/integration).
> 
> I've noticed that when I attempt to run through a test - it seems that the 
> pipeline never acknowledges the "end" of the stream in a bounded context and 
> just runs forever and never makes it to my assert.
> 
> Does anything look glaringly wrong with how the source is being defined?
> object KafkaEventSource {
> 
> fun withParameters(parameters: ParameterTool): KafkaSource {
> val schemaRegistryUrl = parameters.getRequired("schema.registry.url")
> 
> val builder = KafkaSource.builder()
> .setBootstrapServers(parameters.getRequired("bootstrap.servers"))
> .setGroupId(parameters.getRequired("group.id"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setProperty("schema.registry.url", schemaRegistryUrl)
> .setTopics(parameters.getRequired("topic"))
> .setDeserializer(EventDeserializer(schemaRegistryUrl))
> 
> if (parameters.getBoolean("bounded", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> 
> return builder.build()
> }
> }
> I can verify that the generated source has it's boundedness set properly and 
> all of the configuration options are correct.
> 
> My test itself is fairly simple and can be broken down as follows:
> Inject records into a Kafka Topic
> Initialize my Flink job using all of my testing parameters
> Apply my assertion (in this case verifying that a JdbcSink wrote to a 
> specific database)
> @Test
> fun `Example `(){
> // Arrange
> val events = getTestEvents()
> sendToKafka(events, parameters)
> 
> // Act
> EntityIdentificationJob.run(parameters)
> 
> // Assert
> val users = queryCount("SELECT * FROM users", connection)
> assertEquals(1, users)
> }
> Where my job itself is broken down further and reads from the source, 
> performs a process function into multiple side outputs and writes each of 
> them to a distinct JdbcSink based on the type:
> 
> @JvmStatic
> fun main(args: Array) {
> val parameters = loadParams(args)
> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
> 
> // Read from Kafka
> val entities = stream
>.fromSource(KafkaEventSource.withParameters(parameters), 
> WatermarkStrategy.noWatermarks(), "kafka")
>.process(IdentifyEntitiesFunction())
> 
> // Write out each tag to its respective sink
> for (entityType in EntityTypes.all) {
> entities
> .getSideOutput(entityType)
> .addSink(PostgresEntitySink.withEntity(entityType.typeInfo, 
> parameters))
> }
> 
> stream.execute(parameters.getRequired("application"))
> }
> I can verify in the logs that my sink is being executed and writing to the 
> appropriate database, however the job itself never finishes. I've tried it 
> using a single Kafka partition as well as multiple partitions and even 
> commented out the logic related to writing to the database. It still 

Handling Bounded Sources with KafkaSource

2021-03-12 Thread Rion Williams
Hi all,

I've been using the KafkaSource API as opposed to the classic consumer and
things have been going well. I configured my source such that it could be
used in either a streaming or bounded mode, with the bounded approach
specifically aimed at improving testing (unit/integration).

I've noticed that when I attempt to run through a test - it seems that the
pipeline never acknowledges the "end" of the stream in a bounded context
and just runs forever and never makes it to my assert.

Does anything look glaringly wrong with how the source is being defined?

object KafkaEventSource {

fun withParameters(parameters: ParameterTool): KafkaSource {
val schemaRegistryUrl = parameters.getRequired("schema.registry.url")

val builder = KafkaSource.builder()
.setBootstrapServers(parameters.getRequired("bootstrap.servers"))
.setGroupId(parameters.getRequired("group.id"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperty("schema.registry.url", schemaRegistryUrl)
.setTopics(parameters.getRequired("topic"))
.setDeserializer(EventDeserializer(schemaRegistryUrl))

if (parameters.getBoolean("bounded", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
}

I can verify that the generated source has it's boundedness set properly
and all of the configuration options are correct.

My test itself is fairly simple and can be broken down as follows:

   1. Inject records into a Kafka Topic
   2. Initialize my Flink job using all of my testing parameters
   3. Apply my assertion (in this case verifying that a JdbcSink wrote to a
   specific database)

@Test
fun `Example `(){
// Arrange
val events = getTestEvents()
sendToKafka(events, parameters)

// Act
EntityIdentificationJob.run(parameters)

// Assert
val users = queryCount("SELECT * FROM users", connection)
assertEquals(1, users)
}

Where my job itself is broken down further and reads from the source,
performs a process function into multiple side outputs and writes each of
them to a distinct JdbcSink based on the type:

@JvmStatic
fun main(args: Array) {
val parameters = loadParams(args)
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

// Read from Kafka
val entities = stream
   .fromSource(KafkaEventSource.withParameters(parameters),
WatermarkStrategy.noWatermarks(), "kafka")
   .process(IdentifyEntitiesFunction())

// Write out each tag to its respective sink
for (entityType in EntityTypes.all) {
entities
.getSideOutput(entityType)
.addSink(PostgresEntitySink.withEntity(entityType.typeInfo,
parameters))
}

stream.execute(parameters.getRequired("application"))
}

I can verify in the logs that my sink is being executed and writing to the
appropriate database, however the job itself never finishes. I've tried it
using a single Kafka partition as well as multiple partitions and even
commented out the logic related to writing to the database. It still just
seems to run ... forever.

Any recommendations? Perhaps there's a bad configuration or setting that
isn't being used as intended?

Thanks,

Rion