Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2443#discussion_r155650878
--- Diff:
sql/storm-sql-external/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
---
@@ -68,62 +70,13 @@
@SuppressWarnings("unchecked")
@Test
public void testKafkaSink() throws Exception {
- ISqlTridentDataSource ds =
DataSourcesRegistry.constructTridentDataSource(
+ ISqlStreamsDataSource ds =
DataSourcesRegistry.constructStreamsDataSource(
URI.create("kafka://mock?topic=foo"), null, null,
TBL_PROPERTIES, FIELDS);
Assert.assertNotNull(ds);
- ISqlTridentDataSource.SqlTridentConsumer consumer =
ds.getConsumer();
+ IRichBolt consumer = ds.getConsumer();
- Assert.assertEquals(TridentKafkaStateFactory.class,
consumer.getStateFactory().getClass());
- Assert.assertEquals(TridentKafkaUpdater.class,
consumer.getStateUpdater().getClass());
-
- TridentKafkaState state = (TridentKafkaState)
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
- KafkaProducer producer = mock(KafkaProducer.class);
-
doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
- Field producerField =
state.getClass().getDeclaredField("producer");
- producerField.setAccessible(true);
- producerField.set(state, producer);
-
- List<TridentTuple> tupleList = mockTupleList();
- for (TridentTuple t : tupleList) {
- state.updateState(Collections.singletonList(t), null);
- verify(producer).send(argThat(new KafkaMessageMatcher(t)));
- }
- verifyNoMoreInteractions(producer);
- }
-
- private static List<TridentTuple> mockTupleList() {
- List<TridentTuple> tupleList = new ArrayList<>();
- TridentTuple t0 = mock(TridentTuple.class);
- TridentTuple t1 = mock(TridentTuple.class);
- doReturn(1).when(t0).get(0);
- doReturn(2).when(t1).get(0);
- doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
- doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
- tupleList.add(t0);
- tupleList.add(t1);
- return tupleList;
- }
-
- private static class KafkaMessageMatcher implements
ArgumentMatcher<ProducerRecord<Object, ByteBuffer>> {
-
- private static final int PRIMARY_INDEX = 0;
- private final TridentTuple tuple;
-
- private KafkaMessageMatcher(TridentTuple tuple) {
- this.tuple = tuple;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean matches(ProducerRecord<Object, ByteBuffer> record) {
- if (record.key() != tuple.get(PRIMARY_INDEX)) {
- return false;
- }
- ByteBuffer buf = record.value();
- ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
- return b.equals(buf);
- }
+ Assert.assertEquals(KafkaBolt.class, consumer.getClass());
--- End diff --
Same answer.
---