Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202355372 --- Diff: sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java --- @@ -88,46 +75,12 @@ public void shutDown() throws IOException { @SuppressWarnings("unchecked") @Test public void testHdfsSink() throws Exception { - ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource( + ISqlStreamsDataSource ds = DataSourcesRegistry.constructStreamsDataSource( URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS); Assert.assertNotNull(ds); - ISqlTridentDataSource.SqlTridentConsumer consumer = ds.getConsumer(); - - Assert.assertEquals(HdfsStateFactory.class, consumer.getStateFactory().getClass()); - Assert.assertEquals(HdfsUpdater.class, consumer.getStateUpdater().getClass()); - - HdfsState state = (HdfsState) consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1); - StateUpdater stateUpdater = consumer.getStateUpdater(); - - HdfsFileOptions options = mock(HdfsFileOptions.class); - Field optionsField = state.getClass().getDeclaredField("options"); - optionsField.setAccessible(true); - optionsField.set(state, options); + IRichBolt consumer = ds.getConsumer(); - List<TridentTuple> tupleList = mockTupleList(); - - for (TridentTuple t : tupleList) { - stateUpdater.updateState(state, Collections.singletonList(t), null); - try { - verify(options).execute(Collections.singletonList(t)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - private static List<TridentTuple> mockTupleList() { - List<TridentTuple> tupleList = new ArrayList<>(); - TridentTuple t0 = mock(TridentTuple.class); - TridentTuple t1 = mock(TridentTuple.class); - doReturn(1).when(t0).get(0); - doReturn(2).when(t1).get(0); - doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues(); - doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues(); - tupleList.add(t0); - tupleList.add(t1); - return tupleList; + Assert.assertEquals(HdfsBolt.class, consumer.getClass()); --- End diff -- Looking at this again, I think the way to do it would be to use a factory like https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java to instantiate the HdfsBolt. You could pass the factory class name in through a property and instantiate it when the data source is created. The default factory would just call `new HdfsBolt()`. I'm not sure if it's worth it. Up to you.
---