This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bcaffe2  Fixed a bug in HttpPostEmitter leading to ClassCastException 
(#8205)
bcaffe2 is described below

commit bcaffe2bc0c6e915c50df9fcb1c191d326a7dfd5
Author: Artiom Darie <artyom...@gmail.com>
AuthorDate: Thu Aug 1 20:36:23 2019 +0300

    Fixed a bug in HttpPostEmitter leading to ClassCastException (#8205)
    
    * Issue 8206: Fixed class cast exception in case of batch recovery
    
    * Issue 8206: Added HttpPostEmitterTest license header
    
    * Issue 8206: Updated comments accordingly to code review.
    
    * Issue 8206: Updated HttpPostEmitterTest accordingly to new modifications.
---
 .../apache/druid/java/util/emitter/core/Batch.java |  3 +
 .../java/util/emitter/core/HttpPostEmitter.java    | 18 ++--
 .../util/emitter/core/HttpPostEmitterTest.java     | 98 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java 
b/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java
index 5bc598a..fbcfb23 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java
@@ -92,6 +92,9 @@ class Batch extends AbstractQueuedLongSynchronizer
    * Ordering number of this batch, as they filled & emitted in {@link 
HttpPostEmitter} serially, starting from 0.
    * It's a boxed Long rather than primitive long, because we want to minimize 
the number of allocations done in
    * {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link 
OutOfMemoryError}.
+   *
+   * See {@link HttpPostEmitter#concurrentBatch} which may store this object.
+   *
    * @see HttpPostEmitter#onSealExclusive
    * @see HttpPostEmitter#concurrentBatch
    */
diff --git 
a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
 
b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
index 85ad787..1036fb9 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java
@@ -109,10 +109,10 @@ public class HttpPostEmitter implements Flushable, 
Closeable, Emitter
   private final AtomicInteger approximateBuffersToReuseCount = new 
AtomicInteger();
 
   /**
-   * concurrentBatch.get() == null means the service is closed. 
concurrentBatch.get() is the instance of Integer,
-   * it means that some thread has failed with a serious error during {@link 
#onSealExclusive} (with the batch number
-   * corresponding to the Integer object) and {@link #tryRecoverCurrentBatch} 
needs to be called. Otherwise (i. e.
-   * normally), an instance of {@link Batch} is stored in this atomic 
reference.
+   * concurrentBatch.get() == null means the service is closed. 
concurrentBatch.get() is the instance of Long (i. e. the
+   * type of {@link Batch#batchNumber}), it means that some thread has failed 
with a serious error during {@link
+   * #onSealExclusive} (with the batch number corresponding to the Long 
object) and {@link #tryRecoverCurrentBatch}
+   * needs to be called. Otherwise (i. e. normally), an instance of {@link 
Batch} is stored in this atomic reference.
    */
   private final AtomicReference<Object> concurrentBatch = new 
AtomicReference<>();
 
@@ -251,8 +251,8 @@ public class HttpPostEmitter implements Flushable, 
Closeable, Emitter
 
     while (true) {
       Object batchObj = concurrentBatch.get();
-      if (batchObj instanceof Integer) {
-        tryRecoverCurrentBatch((Integer) batchObj);
+      if (batchObj instanceof Long) {
+        tryRecoverCurrentBatch((Long) batchObj);
         continue;
       }
       if (batchObj == null) {
@@ -342,7 +342,7 @@ public class HttpPostEmitter implements Flushable, 
Closeable, Emitter
     }
   }
 
-  private void tryRecoverCurrentBatch(Integer failedBatchNumber)
+  private void tryRecoverCurrentBatch(Long failedBatchNumber)
   {
     log.info("Trying to recover currentBatch");
     long nextBatchNumber = 
ConcurrentAwaitableCounter.nextCount(failedBatchNumber);
@@ -535,8 +535,8 @@ public class HttpPostEmitter implements Flushable, 
Closeable, Emitter
         if (batch instanceof Batch) {
           ((Batch) batch).sealIfFlushNeeded();
         } else {
-          // batch == null means that HttpPostEmitter is terminated. Batch 
object could also be Integer, if some
-          // thread just failed with a serious error in onSealExclusive(), in 
this case we don't want to shutdown
+          // batch == null means that HttpPostEmitter is terminated. Batch 
object might also be a Long object if some
+          // thread just failed with a serious error in onSealExclusive(). In 
this case we don't want to shutdown
           // the emitter thread.
           needsToShutdown = batch == null;
         }
diff --git 
a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
 
b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
new file mode 100644
index 0000000..a1a6a3f
--- /dev/null
+++ 
b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.core;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.primitives.Ints;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class HttpPostEmitterTest
+{
+
+  private static final ObjectMapper objectMapper = new ObjectMapper()
+  {
+    @Override
+    public byte[] writeValueAsBytes(Object value)
+    {
+      return Ints.toByteArray(((IntEvent) value).index);
+    }
+  };
+
+  private final MockHttpClient httpClient = new MockHttpClient();
+
+  @Before
+  public void setup()
+  {
+    httpClient.setGoHandler(new GoHandler()
+    {
+      @Override
+      protected ListenableFuture<Response> go(Request request)
+      {
+        return GoHandlers.immediateFuture(EmitterTest.okResponse());
+      }
+    });
+  }
+
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRecoveryEmitAndReturnBatch()
+      throws InterruptedException, IOException, NoSuchFieldException, 
IllegalAccessException
+  {
+    HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar";)
+        .setFlushMillis(100)
+        .setFlushCount(4)
+        .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
+        .setMaxBatchSize(1024 * 1024)
+        .setBatchQueueSizeLimit(1000)
+        .build();
+    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, 
objectMapper);
+    emitter.start();
+
+    // emit first event
+    emitter.emitAndReturnBatch(new IntEvent());
+    Thread.sleep(1000L);
+
+    // get concurrentBatch reference and set value to lon as if it would fail 
while
+    // HttpPostEmitter#onSealExclusive method invocation.
+    Field concurrentBatch = 
emitter.getClass().getDeclaredField("concurrentBatch");
+    concurrentBatch.setAccessible(true);
+    ((AtomicReference<Object>) concurrentBatch.get(emitter)).getAndSet(1L);
+    // something terrible happened previously so that batch has to recover
+
+    // emit second event
+    emitter.emitAndReturnBatch(new IntEvent());
+
+    emitter.flush();
+    emitter.close();
+
+    Assert.assertEquals(2, emitter.getTotalEmittedEvents());
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to