Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r155515253 --- Diff: sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java --- @@ -68,62 +70,13 @@ @SuppressWarnings("unchecked") @Test public void testKafkaSink() throws Exception { - ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( + ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create("kafka://mock?topic=foo"), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); - ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); + IRichBolt consumer = ds.getConsumer(); - Assert.assertEquals(TridentKafkaStateFactory.class, consumer.getStateFactory().getClass()); - Assert.assertEquals(TridentKafkaUpdater.class, consumer.getStateUpdater().getClass()); - - TridentKafkaState state = (TridentKafkaState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); - KafkaProducer producer = mock(KafkaProducer.class); - doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class)); - Field producerField = state.getClass().getDeclaredField("producer"); - producerField.setAccessible(true); - producerField.set(state, producer); - - List<TridentTuple> tupleList = mockTupleList(); - for (TridentTuple t : tupleList) { - state.updateState(Collections.singletonList(t), null); - verify(producer).send(argThat(new KafkaMessageMatcher(t))); - } - verifyNoMoreInteractions(producer); - } - - private static List<TridentTuple> mockTupleList() { - List<TridentTuple> tupleList = new ArrayList<>(); - TridentTuple t0 = mock(TridentTuple.class); - TridentTuple t1 = mock(TridentTuple.class); - doReturn(1).when(t0).get(0); - doReturn(2).when(t1).get(0); - doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues(); - doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues(); - tupleList.add(t0); - tupleList.add(t1); - return tupleList; - } - - private static class KafkaMessageMatcher implements ArgumentMatcher<ProducerRecord<Object, ByteBuffer>> { - - private static final int PRIMARY_INDEX = 0; - private final TridentTuple tuple; - - private KafkaMessageMatcher(TridentTuple tuple) { - this.tuple = tuple; - } - - @SuppressWarnings("unchecked") - @Override - public boolean matches(ProducerRecord<Object, ByteBuffer> record) { - if (record.key() != tuple.get(PRIMARY_INDEX)) { - return false; - } - ByteBuffer buf = record.value(); - ByteBuffer b = SERIALIZER.write(tuple.getValues(), null); - return b.equals(buf); - } + Assert.assertEquals(KafkaBolt.class, consumer.getClass()); --- End diff -- Same question as for the HDFS test
---