This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new cb78a61 SAMZA-2348 : Notifying BEM about kafkaConsumerProxy failure
to prevent BEM poll-blockin in case of proxy failures (#1187)
cb78a61 is described below
commit cb78a614716bc39281edc973435626c2032f27ca
Author: rmatharu <[email protected]>
AuthorDate: Mon Oct 14 16:13:44 2019 -0700
SAMZA-2348 : Notifying BEM about kafkaConsumerProxy failure to prevent BEM
poll-blockin in case of proxy failures (#1187)
* Notifying BEM about kafkaConsumerProxy failure via kafkaConsumer to
prevent BEM poll blocked in case of proxy failures
---
.../java/org/apache/samza/util/BlockingEnvelopeMap.java | 13 +++++++++++++
.../apache/samza/system/kafka/KafkaSystemConsumer.java | 15 ++++++++++++++-
.../org/apache/samza/system/kafka/KafkaConsumerProxy.java | 9 ++++++---
.../samza/system/kafka/KafkaConsumerProxyFactory.java | 2 +-
4 files changed, 34 insertions(+), 5 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 8b792b4..d80b2a4 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
@@ -68,6 +69,7 @@ public abstract class BlockingEnvelopeMap implements
SystemConsumer {
private final ConcurrentHashMap<SystemStreamPartition, AtomicLong>
bufferedMessagesSize; // size in bytes per SystemStreamPartition
private final Map<SystemStreamPartition, Boolean> noMoreMessage;
private final Clock clock;
+ private volatile Throwable failureCause = null;
public BlockingEnvelopeMap() {
this(new NoOpMetricsRegistry());
@@ -142,6 +144,13 @@ public abstract class BlockingEnvelopeMap implements
SystemConsumer {
// Block until we get at least one message, or until we catch up to
// the head of the stream.
while (envelope == null && !isAtHead(systemStreamPartition)) {
+
+ // Check for consumerFailure and throw exception
+ if (this.failureCause != null) {
+ String message = String.format("%s: Consumer has stopped.",
this);
+ throw new SamzaException(message, this.failureCause);
+ }
+
metrics.incBlockingPoll(systemStreamPartition);
envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
}
@@ -241,6 +250,10 @@ public abstract class BlockingEnvelopeMap implements
SystemConsumer {
return getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead !=
null && isAtHead.equals(true);
}
+ protected void setFailureCause(Throwable throwable) {
+ this.failureCause = throwable;
+ }
+
public class BlockingEnvelopeMapMetrics {
private final String group;
private final MetricsRegistry metricsRegistry;
diff --git
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 015b76a..33e2520 100644
---
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -101,7 +101,7 @@ public class KafkaSystemConsumer<K, V> extends
BlockingEnvelopeMap implements Sy
messageSink = new KafkaConsumerMessageSink();
// Create the proxy to do the actual message reading.
- proxy = kafkaConsumerProxyFactory.create(this.messageSink);
+ proxy = kafkaConsumerProxyFactory.create(this);
LOG.info("{}: Created proxy {} ", this, proxy);
}
@@ -219,6 +219,15 @@ public class KafkaSystemConsumer<K, V> extends
BlockingEnvelopeMap implements Sy
this, fetchThresholdBytes, fetchThreshold, numPartitions,
perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
}
+ /**
+ * Invoked by {@link KafkaConsumerProxy} to notify the consumer of failure,
so it can relay and stop the BEM polling.
+ * @param throwable the cause of the failure of the proxy
+ */
+ @Override
+ public void setFailureCause(Throwable throwable) {
+ this.setFailureCause(throwable); // notify the BEM
+ }
+
@Override
public void stop() {
if (!stopped.compareAndSet(false, true)) {
@@ -320,6 +329,10 @@ public class KafkaSystemConsumer<K, V> extends
BlockingEnvelopeMap implements Sy
return systemName;
}
+ public KafkaConsumerMessageSink getMessageSink() {
+ return this.messageSink;
+ }
+
public class KafkaConsumerMessageSink {
public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean
isAtHighWatermark) {
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index aedf3f1..4ecfc6a 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -59,6 +59,7 @@ public class KafkaConsumerProxy<K, V> {
private final Thread consumerPollThread;
private final Consumer<K, V> kafkaConsumer;
+ private final KafkaSystemConsumer kafkaSystemConsumer;
private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
private final String metricName;
@@ -75,10 +76,11 @@ public class KafkaConsumerProxy<K, V> {
private volatile Throwable failureCause = null;
private final CountDownLatch consumerPollThreadStartLatch = new
CountDownLatch(1);
- public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName,
String clientId,
+ public KafkaConsumerProxy(KafkaSystemConsumer kafkaSystemConsumer,
Consumer<K, V> kafkaConsumer, String systemName, String clientId,
KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink,
KafkaSystemConsumerMetrics samzaConsumerMetrics,
String metricName) {
+ this.kafkaSystemConsumer = kafkaSystemConsumer;
this.kafkaConsumer = kafkaConsumer;
this.systemName = systemName;
this.sink = messageSink;
@@ -204,6 +206,7 @@ public class KafkaConsumerProxy<K, V> {
// KafkaSystemConsumer uses the failureCause to propagate the
throwable to the container
failureCause = throwable;
isRunning = false;
+ kafkaSystemConsumer.setFailureCause(this.failureCause);
}
if (!isRunning) {
@@ -466,9 +469,9 @@ public class KafkaConsumerProxy<K, V> {
this.kafkaSystemConsumerMetrics = kafkaSystemConsumerMetrics;
}
- public KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K,
V>.KafkaConsumerMessageSink messageSink) {
+ public KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V>
kafkaSystemConsumer) {
String metricName = String.format("%s-%s", systemName, clientId);
- return new KafkaConsumerProxy<>(this.kafkaConsumer, this.systemName,
this.clientId, messageSink,
+ return new KafkaConsumerProxy<>(kafkaSystemConsumer, this.kafkaConsumer,
this.systemName, this.clientId, kafkaSystemConsumer.getMessageSink(),
this.kafkaSystemConsumerMetrics, metricName);
}
}
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
index a566d2a..cc4bddc 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxyFactory.java
@@ -25,5 +25,5 @@ package org.apache.samza.system.kafka;
* {@link KafkaConsumerProxy} needs to be used within kafka system components
like {@link KafkaSystemConsumer}.
*/
public interface KafkaConsumerProxyFactory<K, V> {
- KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K,
V>.KafkaConsumerMessageSink messageSink);
+ KafkaConsumerProxy<K, V> create(KafkaSystemConsumer<K, V>
kafkaSystemConsumer);
}