This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe4a9b8a4 fix spammy logs for
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest
(#9516)
3fe4a9b8a4 is described below
commit 3fe4a9b8a49db17efae7716a1d12feee9f2e13a6
Author: Almog Gavra <[email protected]>
AuthorDate: Tue Oct 4 13:35:15 2022 -0700
fix spammy logs for
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest
(#9516)
---
...oMessageDecoderRealtimeClusterIntegrationTest.java | 19 +++++++++++++++----
pinot-integration-tests/src/test/resources/log4j2.xml | 5 -----
2 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
index 21627741f1..2e021aacde 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
@@ -75,6 +75,7 @@ public class
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegr
private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
Collections.singletonList("DivActualElapsedTime");
private static final long RANDOM_SEED = System.currentTimeMillis();
private static final Random RANDOM = new Random(RANDOM_SEED);
+ private static final int NUM_INVALID_RECORDS = 5;
private final boolean _isDirectAlloc = RANDOM.nextBoolean();
private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
@@ -129,29 +130,39 @@ public class
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegr
"io.confluent.kafka.serializers.KafkaAvroSerializer");
Producer<byte[], GenericRecord> avroProducer = new
KafkaProducer<>(avroProducerProps);
+ // this producer produces intentionally malformatted records so that
+ // we can test the behavior when consuming such records
Properties nonAvroProducerProps = new Properties();
nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:" + getKafkaPort());
nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
- Producer<byte[], byte[]> nonAvroProducer = new
KafkaProducer<>(nonAvroProducerProps);
+ Producer<byte[], byte[]> invalidDataProducer = new
KafkaProducer<>(nonAvroProducerProps);
if (injectTombstones()) {
// publish lots of tombstones to livelock the consumer if it can't
handle this properly
for (int i = 0; i < 1000; i++) {
// publish a tombstone first
- nonAvroProducer.send(
+ avroProducer.send(
new ProducerRecord<>(getKafkaTopic(),
Longs.toByteArray(System.currentTimeMillis()), null));
}
}
+
for (File avroFile : avroFiles) {
+ int numInvalidRecords = 0;
try (DataFileStream<GenericRecord> reader =
AvroUtils.getAvroReader(avroFile)) {
for (GenericRecord genericRecord : reader) {
byte[] keyBytes = (getPartitionColumn() == null) ?
Longs.toByteArray(System.currentTimeMillis())
:
(genericRecord.get(getPartitionColumn())).toString().getBytes();
- // Ignore getKafkaMessageHeader()
- nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes,
"Rubbish".getBytes(UTF_8)));
+
+ if (numInvalidRecords < NUM_INVALID_RECORDS) {
+ // send a few rubbish records to validate that the consumer will
skip over non-avro records, but
+ // don't spam them every time as it causes log spam
+ invalidDataProducer.send(new ProducerRecord<>(getKafkaTopic(),
keyBytes, "Rubbish".getBytes(UTF_8)));
+ numInvalidRecords++;
+ }
+
avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes,
genericRecord));
}
}
diff --git a/pinot-integration-tests/src/test/resources/log4j2.xml
b/pinot-integration-tests/src/test/resources/log4j2.xml
index b12865cc65..439331f9d7 100644
--- a/pinot-integration-tests/src/test/resources/log4j2.xml
+++ b/pinot-integration-tests/src/test/resources/log4j2.xml
@@ -26,11 +26,6 @@
</Console>
</Appenders>
<Loggers>
- <!--Turn off the logger for KafkaConfluentSchemaRegistryAvroMessageDecoder
because we intentionally inject
- tombstones in
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest
which can flood the log
- -->
- <Logger
name="org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder"
- level="off" additivity="false"/>
<Logger name="org.apache.pinot" level="warn" additivity="false">
<AppenderRef ref="console"/>
</Logger>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]