This is an automated email from the ASF dual-hosted git repository. leventov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new bcaffe2 Fixed a bug in HttpPostEmitter leading to ClassCastException (#8205) bcaffe2 is described below commit bcaffe2bc0c6e915c50df9fcb1c191d326a7dfd5 Author: Artiom Darie <artyom...@gmail.com> AuthorDate: Thu Aug 1 20:36:23 2019 +0300 Fixed a bug in HttpPostEmitter leading to ClassCastException (#8205) * Issue 8206: Fixed class cast exception in case of batch recovery * Issue 8206: Added HttpPostEmitterTest license header * Issue 8206: Updated comments accordingly to code review. * Issue 8206: Updated HttpPostEmitterTest accordingly to new modifications. --- .../apache/druid/java/util/emitter/core/Batch.java | 3 + .../java/util/emitter/core/HttpPostEmitter.java | 18 ++-- .../util/emitter/core/HttpPostEmitterTest.java | 98 ++++++++++++++++++++++ 3 files changed, 110 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java index 5bc598a..fbcfb23 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java @@ -92,6 +92,9 @@ class Batch extends AbstractQueuedLongSynchronizer * Ordering number of this batch, as they filled & emitted in {@link HttpPostEmitter} serially, starting from 0. * It's a boxed Long rather than primitive long, because we want to minimize the number of allocations done in * {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link OutOfMemoryError}. + * + * See {@link HttpPostEmitter#concurrentBatch} which may store this object. + * * @see HttpPostEmitter#onSealExclusive * @see HttpPostEmitter#concurrentBatch */ diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java index 85ad787..1036fb9 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java @@ -109,10 +109,10 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter private final AtomicInteger approximateBuffersToReuseCount = new AtomicInteger(); /** - * concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Integer, - * it means that some thread has failed with a serious error during {@link #onSealExclusive} (with the batch number - * corresponding to the Integer object) and {@link #tryRecoverCurrentBatch} needs to be called. Otherwise (i. e. - * normally), an instance of {@link Batch} is stored in this atomic reference. + * concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Long (i. e. the + * type of {@link Batch#batchNumber}), it means that some thread has failed with a serious error during {@link + * #onSealExclusive} (with the batch number corresponding to the Long object) and {@link #tryRecoverCurrentBatch} + * needs to be called. Otherwise (i. e. normally), an instance of {@link Batch} is stored in this atomic reference. */ private final AtomicReference<Object> concurrentBatch = new AtomicReference<>(); @@ -251,8 +251,8 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter while (true) { Object batchObj = concurrentBatch.get(); - if (batchObj instanceof Integer) { - tryRecoverCurrentBatch((Integer) batchObj); + if (batchObj instanceof Long) { + tryRecoverCurrentBatch((Long) batchObj); continue; } if (batchObj == null) { @@ -342,7 +342,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter } } - private void tryRecoverCurrentBatch(Integer failedBatchNumber) + private void tryRecoverCurrentBatch(Long failedBatchNumber) { log.info("Trying to recover currentBatch"); long nextBatchNumber = ConcurrentAwaitableCounter.nextCount(failedBatchNumber); @@ -535,8 +535,8 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter if (batch instanceof Batch) { ((Batch) batch).sealIfFlushNeeded(); } else { - // batch == null means that HttpPostEmitter is terminated. Batch object could also be Integer, if some - // thread just failed with a serious error in onSealExclusive(), in this case we don't want to shutdown + // batch == null means that HttpPostEmitter is terminated. Batch object might also be a Long object if some + // thread just failed with a serious error in onSealExclusive(). In this case we don't want to shutdown // the emitter thread. needsToShutdown = batch == null; } diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java new file mode 100644 index 0000000..a1a6a3f --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.primitives.Ints; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Request; +import org.asynchttpclient.Response; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicReference; + +public class HttpPostEmitterTest +{ + + private static final ObjectMapper objectMapper = new ObjectMapper() + { + @Override + public byte[] writeValueAsBytes(Object value) + { + return Ints.toByteArray(((IntEvent) value).index); + } + }; + + private final MockHttpClient httpClient = new MockHttpClient(); + + @Before + public void setup() + { + httpClient.setGoHandler(new GoHandler() + { + @Override + protected ListenableFuture<Response> go(Request request) + { + return GoHandlers.immediateFuture(EmitterTest.okResponse()); + } + }); + } + + + @Test + @SuppressWarnings("unchecked") + public void testRecoveryEmitAndReturnBatch() + throws InterruptedException, IOException, NoSuchFieldException, IllegalAccessException + { + HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar") + .setFlushMillis(100) + .setFlushCount(4) + .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS) + .setMaxBatchSize(1024 * 1024) + .setBatchQueueSizeLimit(1000) + .build(); + final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper); + emitter.start(); + + // emit first event + emitter.emitAndReturnBatch(new IntEvent()); + Thread.sleep(1000L); + + // get concurrentBatch reference and set value to lon as if it would fail while + // HttpPostEmitter#onSealExclusive method invocation. + Field concurrentBatch = emitter.getClass().getDeclaredField("concurrentBatch"); + concurrentBatch.setAccessible(true); + ((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L); + // something terrible happened previously so that batch has to recover + + // emit second event + emitter.emitAndReturnBatch(new IntEvent()); + + emitter.flush(); + emitter.close(); + + Assert.assertEquals(2, emitter.getTotalEmittedEvents()); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org