This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 43bb514b6a400a71f2f3ca2e7f2ddc7cdfd8e459 Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Thu Jun 27 18:45:36 2019 +0200 [BEAM-7589] Use only one KinesisProducer instance per JVM --- .../org/apache/beam/sdk/io/kinesis/KinesisIO.java | 144 ++++++++++----------- .../beam/sdk/io/kinesis/KinesisMockWriteTest.java | 20 ++- .../beam/sdk/io/kinesis/KinesisProducerMock.java | 39 +++--- 3 files changed, 95 insertions(+), 108 deletions(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index 7a5cb52..6e1d951 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -29,13 +29,15 @@ import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import com.amazonaws.services.kinesis.producer.UserRecordFailedException; import com.amazonaws.services.kinesis.producer.UserRecordResult; import com.google.auto.value.AutoValue; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -587,20 +589,35 @@ public final class KinesisIO { private static class KinesisWriterFn extends DoFn<byte[], Void> { - private static final int MAX_NUM_RECORDS = 100 * 1000; private static final int MAX_NUM_FAILURES = 10; private final KinesisIO.Write spec; - private transient IKinesisProducer producer; + private static transient IKinesisProducer producer; private transient KinesisPartitioner partitioner; private transient LinkedBlockingDeque<KinesisWriteException> failures; + private transient List<Future<UserRecordResult>> putFutures; - public KinesisWriterFn(KinesisIO.Write spec) { + KinesisWriterFn(KinesisIO.Write spec) { this.spec = spec; + initKinesisProducer(); } @Setup - public void setup() throws Exception { + public void setup() { + // Use custom partitioner if it exists + if (spec.getPartitioner() != null) { + partitioner = spec.getPartitioner(); + } + } + + @StartBundle + public void startBundle() { + putFutures = Collections.synchronizedList(new ArrayList<>()); + /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */ + failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES); + } + + private synchronized void initKinesisProducer() { // Init producer config Properties props = spec.getProducerProperties(); if (props == null) { @@ -614,13 +631,6 @@ public final class KinesisIO { // Init Kinesis producer producer = spec.getAWSClientsProvider().createKinesisProducer(config); - // Use custom partitioner if it exists - if (spec.getPartitioner() != null) { - partitioner = spec.getPartitioner(); - } - - /** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */ - failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES); } /** @@ -638,13 +648,7 @@ public final class KinesisIO { * the KPL</a> */ @ProcessElement - public void processElement(ProcessContext c) throws Exception { - checkForFailures(); - - // Need to avoid keeping too many futures in producer's map to prevent OOM. - // In usual case, it should exit immediately. - flush(MAX_NUM_RECORDS); - + public void processElement(ProcessContext c) { ByteBuffer data = ByteBuffer.wrap(c.element()); String partitionKey = spec.getPartitionKey(); String explicitHashKey = null; @@ -657,73 +661,77 @@ public final class KinesisIO { ListenableFuture<UserRecordResult> f = producer.addUserRecord(spec.getStreamName(), partitionKey, explicitHashKey, data); - Futures.addCallback(f, new UserRecordResultFutureCallback()); + putFutures.add(f); } @FinishBundle public void finishBundle() throws Exception { - // Flush all outstanding records, blocking call - flushAll(); - - checkForFailures(); - } - - @Teardown - public void tearDown() throws Exception { - if (producer != null) { - producer.destroy(); - producer = null; - } + flushBundle(); } /** - * Flush outstanding records until the total number will be less than required or the number - * of retries will be exhausted. The retry timeout starts from 1 second and it doubles on - * every iteration. + * Flush outstanding records until the total number of failed records will be less than 0 or + * the number of retries will be exhausted. The retry timeout starts from 1 second and it + * doubles on every iteration. */ - private void flush(int numMax) throws InterruptedException, IOException { + private void flushBundle() throws InterruptedException, ExecutionException, IOException { int retries = spec.getRetries(); - int numOutstandingRecords = producer.getOutstandingRecordsCount(); + int numFailedRecords; int retryTimeout = 1000; // initial timeout, 1 sec + String message = ""; - while (numOutstandingRecords > numMax && retries-- > 0) { + do { + numFailedRecords = 0; producer.flush(); + + // Wait for puts to finish and check the results + for (Future<UserRecordResult> f : putFutures) { + UserRecordResult result = f.get(); // this does block + if (!result.isSuccessful()) { + numFailedRecords++; + } + } + // wait until outstanding records will be flushed Thread.sleep(retryTimeout); - numOutstandingRecords = producer.getOutstandingRecordsCount(); retryTimeout *= 2; // exponential backoff - } + } while (numFailedRecords > 0 && retries-- > 0); + + if (numFailedRecords > 0) { + for (Future<UserRecordResult> f : putFutures) { + UserRecordResult result = f.get(); + if (!result.isSuccessful()) { + failures.offer( + new KinesisWriteException( + "Put record was not successful.", new UserRecordFailedException(result))); + } + } - if (numOutstandingRecords > numMax) { - String message = + message = String.format( - "After [%d] retries, number of outstanding records [%d] is still greater than " - + "required [%d].", - spec.getRetries(), numOutstandingRecords, numMax); + "After [%d] retries, number of failed records [%d] is still greater than 0", + spec.getRetries(), numFailedRecords); LOG.error(message); - throw new IOException(message); } - } - private void flushAll() throws InterruptedException, IOException { - flush(0); + checkForFailures(message); } /** If any write has asynchronously failed, fail the bundle with a useful error. */ - private void checkForFailures() throws IOException { - // Note that this function is never called by multiple threads and is the only place that - // we remove from failures, so this code is safe. + private void checkForFailures(String message) throws IOException { if (failures.isEmpty()) { return; } StringBuilder logEntry = new StringBuilder(); + logEntry.append(message).append(System.lineSeparator()); + int i = 0; while (!failures.isEmpty()) { i++; KinesisWriteException exc = failures.remove(); - logEntry.append("\n").append(exc.getMessage()); + logEntry.append(System.lineSeparator()).append(exc.getMessage()); Throwable cause = exc.getCause(); if (cause != null) { logEntry.append(": ").append(cause.getMessage()); @@ -733,36 +741,18 @@ public final class KinesisIO { ((UserRecordFailedException) cause).getResult().getAttempts(); for (Attempt attempt : attempts) { if (attempt.getErrorMessage() != null) { - logEntry.append("\n").append(attempt.getErrorMessage()); + logEntry.append(System.lineSeparator()).append(attempt.getErrorMessage()); } } } } } - failures.clear(); - String message = + String errorMessage = String.format( "Some errors occurred writing to Kinesis. First %d errors: %s", i, logEntry.toString()); - throw new IOException(message); - } - - private class UserRecordResultFutureCallback implements FutureCallback<UserRecordResult> { - - @Override - public void onFailure(Throwable cause) { - failures.offer(new KinesisWriteException(cause)); - } - - @Override - public void onSuccess(UserRecordResult result) { - if (!result.isSuccessful()) { - failures.offer( - new KinesisWriteException( - "Put record was not successful.", new UserRecordFailedException(result))); - } - } + throw new IOException(errorMessage); } } } @@ -772,9 +762,5 @@ public final class KinesisIO { KinesisWriteException(String message, Throwable cause) { super(message, cause); } - - KinesisWriteException(Throwable cause) { - super(cause); - } } } diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java index 61140c9..78b72c7 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockWriteTest.java @@ -135,17 +135,15 @@ public class KinesisMockWriteTest { Properties properties = new Properties(); properties.setProperty("KinesisPort", "qwe"); - Iterable<byte[]> data = ImmutableList.of("1".getBytes(StandardCharsets.UTF_8)); - p.apply(Create.of(data)) - .apply( - KinesisIO.write() - .withStreamName(STREAM) - .withPartitionKey(PARTITION_KEY) - .withAWSClientsProvider(new FakeKinesisProvider()) - .withProducerProperties(properties)); + KinesisIO.Write write = + KinesisIO.write() + .withStreamName(STREAM) + .withPartitionKey(PARTITION_KEY) + .withAWSClientsProvider(new FakeKinesisProvider()) + .withProducerProperties(properties); - thrown.expect(RuntimeException.class); - p.run().waitUntilFinish(); + thrown.expect(IllegalArgumentException.class); + write.expand(null); } @Test @@ -183,7 +181,7 @@ public class KinesisMockWriteTest { .withStreamName(STREAM) .withPartitionKey(PARTITION_KEY) .withAWSClientsProvider(new FakeKinesisProvider().setFailedFlush(true)) - .withRetries(1)); + .withRetries(2)); thrown.expect(RuntimeException.class); p.run().waitUntilFinish(); diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java index f9f03ce..215beec 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisProducerMock.java @@ -26,8 +26,10 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import org.joda.time.DateTime; /** Simple mock implementation of {@link IKinesisProducer} for testing. */ @@ -35,31 +37,39 @@ public class KinesisProducerMock implements IKinesisProducer { private boolean isFailedFlush = false; - private List<UserRecord> addedRecords = new ArrayList<>(); + private List<UserRecord> addedRecords = Collections.synchronizedList(new ArrayList<>()); private KinesisServiceMock kinesisService = KinesisServiceMock.getInstance(); + private AtomicInteger seqNumber = new AtomicInteger(0); + public KinesisProducerMock() {} public KinesisProducerMock(KinesisProducerConfiguration config, boolean isFailedFlush) { this.isFailedFlush = isFailedFlush; + this.seqNumber.set(0); } @Override public ListenableFuture<UserRecordResult> addUserRecord( String stream, String partitionKey, ByteBuffer data) { - throw new RuntimeException("Not implemented"); + throw new UnsupportedOperationException("Not implemented"); } @Override public ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord) { - throw new RuntimeException("Not implemented"); + throw new UnsupportedOperationException("Not implemented"); } @Override - public ListenableFuture<UserRecordResult> addUserRecord( + public synchronized ListenableFuture<UserRecordResult> addUserRecord( String stream, String partitionKey, String explicitHashKey, ByteBuffer data) { + seqNumber.incrementAndGet(); SettableFuture<UserRecordResult> f = SettableFuture.create(); + f.set( + new UserRecordResult( + new ArrayList<>(), String.valueOf(seqNumber.get()), explicitHashKey, !isFailedFlush)); + if (kinesisService.getExistedStream().equals(stream)) { addedRecords.add(new UserRecord(stream, partitionKey, explicitHashKey, data)); } @@ -74,24 +84,24 @@ public class KinesisProducerMock implements IKinesisProducer { @Override public List<Metric> getMetrics(String metricName, int windowSeconds) throws InterruptedException, ExecutionException { - throw new RuntimeException("Not implemented"); + throw new UnsupportedOperationException("Not implemented"); } @Override public List<Metric> getMetrics(String metricName) throws InterruptedException, ExecutionException { - throw new RuntimeException("Not implemented"); + throw new UnsupportedOperationException("Not implemented"); } @Override public List<Metric> getMetrics() throws InterruptedException, ExecutionException { - throw new RuntimeException("Not implemented"); + throw new UnsupportedOperationException("Not implemented"); } @Override public List<Metric> getMetrics(int windowSeconds) throws InterruptedException, ExecutionException { - throw new RuntimeException("Not implemented"); + throw new UnsupportedOperationException("Not implemented"); } @Override @@ -99,16 +109,11 @@ public class KinesisProducerMock implements IKinesisProducer { @Override public void flush(String stream) { - throw new RuntimeException("Not implemented"); + throw new UnsupportedOperationException("Not implemented"); } @Override - public void flush() { - if (isFailedFlush) { - // don't flush - return; - } - + public synchronized void flush() { DateTime arrival = DateTime.now(); for (int i = 0; i < addedRecords.size(); i++) { UserRecord record = addedRecords.get(i); @@ -120,8 +125,6 @@ public class KinesisProducerMock implements IKinesisProducer { @Override public synchronized void flushSync() { - if (getOutstandingRecordsCount() > 0) { - flush(); - } + throw new UnsupportedOperationException("Not implemented"); } }