Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2443#discussion_r202355372
  
    --- Diff: 
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
 ---
    @@ -88,46 +75,12 @@ public void shutDown() throws IOException {
         @SuppressWarnings("unchecked")
         @Test
         public void testHdfsSink() throws Exception {
    -        ISqlTridentDataSource ds = 
DataSourcesRegistry.constructTridentDataSource(
    +        ISqlStreamsDataSource ds = 
DataSourcesRegistry.constructStreamsDataSource(
                 URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
             Assert.assertNotNull(ds);
     
    -        ISqlTridentDataSource.SqlTridentConsumer consumer = 
ds.getConsumer();
    -
    -        Assert.assertEquals(HdfsStateFactory.class, 
consumer.getStateFactory().getClass());
    -        Assert.assertEquals(HdfsUpdater.class, 
consumer.getStateUpdater().getClass());
    -
    -        HdfsState state = (HdfsState) 
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
    -        StateUpdater stateUpdater = consumer.getStateUpdater();
    -
    -        HdfsFileOptions options = mock(HdfsFileOptions.class);
    -        Field optionsField = state.getClass().getDeclaredField("options");
    -        optionsField.setAccessible(true);
    -        optionsField.set(state, options);
    +        IRichBolt consumer = ds.getConsumer();
     
    -        List<TridentTuple> tupleList = mockTupleList();
    -
    -        for (TridentTuple t : tupleList) {
    -            stateUpdater.updateState(state, Collections.singletonList(t), 
null);
    -            try {
    -                verify(options).execute(Collections.singletonList(t));
    -            } catch (IOException e) {
    -                throw new RuntimeException(e);
    -            }
    -        }
    -    }
    -
    -    private static List<TridentTuple> mockTupleList() {
    -        List<TridentTuple> tupleList = new ArrayList<>();
    -        TridentTuple t0 = mock(TridentTuple.class);
    -        TridentTuple t1 = mock(TridentTuple.class);
    -        doReturn(1).when(t0).get(0);
    -        doReturn(2).when(t1).get(0);
    -        doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
    -        doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
    -        tupleList.add(t0);
    -        tupleList.add(t1);
    -        return tupleList;
    +        Assert.assertEquals(HdfsBolt.class, consumer.getClass());
    --- End diff --
    
    Looking at this again, I think the way to do it would be to use a factory 
like 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
 to instantiate the HdfsBolt. You could pass the factory class name in through 
a property and instantiate it when the data source is created. The default 
factory would just call `new HdfsBolt()`.
    
    I'm not sure if it's worth it. Up to you.


---

Reply via email to