Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2443#discussion_r202639091
--- Diff:
sql/storm-sql-external/storm-sql-hdfs/src/test/org/apache/storm/sql/hdfs/TestHdfsDataSourcesProvider.java
---
@@ -88,46 +75,12 @@ public void shutDown() throws IOException {
@SuppressWarnings("unchecked")
@Test
public void testHdfsSink() throws Exception {
- ISqlTridentDataSource ds =
DataSourcesRegistry.constructTridentDataSource(
+ ISqlStreamsDataSource ds =
DataSourcesRegistry.constructStreamsDataSource(
URI.create(hdfsURI), null, null, TBL_PROPERTIES, FIELDS);
Assert.assertNotNull(ds);
- ISqlTridentDataSource.SqlTridentConsumer consumer =
ds.getConsumer();
-
- Assert.assertEquals(HdfsStateFactory.class,
consumer.getStateFactory().getClass());
- Assert.assertEquals(HdfsUpdater.class,
consumer.getStateUpdater().getClass());
-
- HdfsState state = (HdfsState)
consumer.getStateFactory().makeState(Collections.emptyMap(), null, 0, 1);
- StateUpdater stateUpdater = consumer.getStateUpdater();
-
- HdfsFileOptions options = mock(HdfsFileOptions.class);
- Field optionsField = state.getClass().getDeclaredField("options");
- optionsField.setAccessible(true);
- optionsField.set(state, options);
+ IRichBolt consumer = ds.getConsumer();
- List<TridentTuple> tupleList = mockTupleList();
-
- for (TridentTuple t : tupleList) {
- stateUpdater.updateState(state, Collections.singletonList(t),
null);
- try {
- verify(options).execute(Collections.singletonList(t));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private static List<TridentTuple> mockTupleList() {
- List<TridentTuple> tupleList = new ArrayList<>();
- TridentTuple t0 = mock(TridentTuple.class);
- TridentTuple t1 = mock(TridentTuple.class);
- doReturn(1).when(t0).get(0);
- doReturn(2).when(t1).get(0);
- doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
- doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
- tupleList.add(t0);
- tupleList.add(t1);
- return tupleList;
+ Assert.assertEquals(HdfsBolt.class, consumer.getClass());
--- End diff --
Thanks for the information! I'll think about applying it but let me address
other comments first.
---