Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2443#discussion_r155515253
  
    --- Diff: 
sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
 ---
    @@ -68,62 +70,13 @@
         @SuppressWarnings("unchecked")
         @Test
         public void testKafkaSink() throws Exception {
    -        ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
    +        ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
                 URI.create("kafka://mock?topic=foo"), null, null, 
TBL_PROPERTIES, FIELDS);
             Assert.assertNotNull(ds);
     
    -        ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
    +        IRichBolt consumer = ds.getConsumer();
     
    -        Assert.assertEquals(TridentKafkaStateFactory.class, 
consumer.getStateFactory().getClass());
    -        Assert.assertEquals(TridentKafkaUpdater.class, 
consumer.getStateUpdater().getClass());
    -
    -        TridentKafkaState state = (TridentKafkaState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
    -        KafkaProducer producer = mock(KafkaProducer.class);
    -        
doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
    -        Field producerField = 
state.getClass().getDeclaredField("producer");
    -        producerField.setAccessible(true);
    -        producerField.set(state, producer);
    -
    -        List<TridentTuple> tupleList = mockTupleList();
    -        for (TridentTuple t : tupleList) {
    -            state.updateState(Collections.singletonList(t), null);
    -            verify(producer).send(argThat(new KafkaMessageMatcher(t)));
    -        }
    -        verifyNoMoreInteractions(producer);
    -    }
    -
    -    private static List<TridentTuple> mockTupleList() {
    -        List<TridentTuple> tupleList = new ArrayList<>();
    -        TridentTuple t0 = mock(TridentTuple.class);
    -        TridentTuple t1 = mock(TridentTuple.class);
    -        doReturn(1).when(t0).get(0);
    -        doReturn(2).when(t1).get(0);
    -        doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
    -        doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
    -        tupleList.add(t0);
    -        tupleList.add(t1);
    -        return tupleList;
    -    }
    -
    -    private static class KafkaMessageMatcher implements 
ArgumentMatcher<ProducerRecord<Object, ByteBuffer>> {
    -
    -        private static final int PRIMARY_INDEX = 0;
    -        private final TridentTuple tuple;
    -
    -        private KafkaMessageMatcher(TridentTuple tuple) {
    -            this.tuple = tuple;
    -        }
    -
    -        @SuppressWarnings("unchecked")
    -        @Override
    -        public boolean matches(ProducerRecord<Object, ByteBuffer> record) {
    -            if (record.key() != tuple.get(PRIMARY_INDEX)) {
    -                return false;
    -            }
    -            ByteBuffer buf = record.value();
    -            ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
    -            return b.equals(buf);
    -        }
    +        Assert.assertEquals(KafkaBolt.class, consumer.getClass());
    --- End diff --
    
    Same question as for the HDFS test


---

Reply via email to