usingh83 commented on a change in pull request #14531: URL: https://github.com/apache/beam/pull/14531#discussion_r626778480
########## File path: examples/java/src/main/java/org/apache/beam/examples/complete/TwitterStreamGenerator/TwitterIO.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.examples.complete.TwitterStreamGenerator; + +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import twitter4j.TwitterStream; +import twitter4j.*; +import twitter4j.conf.ConfigurationBuilder; + +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class TwitterIO { + + /** + * Implementation of read methods. + */ + public static class Read extends PTransform<PBegin, PCollection<Read.TwitterConnection>> { + + static BlockingQueue<Status> queue; + static String key; + static String secret; + static String token; + static String tokenSecret; + + public Read(Builder builder) { + key = builder.key; + secret = builder.secret; + token = builder.token; + tokenSecret = builder.tokenSecret; + queue = new LinkedBlockingQueue<>(); + } + + @Override + public PCollection<TwitterConnection> expand(PBegin input) throws IllegalArgumentException { + if (key == null || secret == null || token == null || tokenSecret == null) { + throw new IllegalArgumentException("Please provide key, secret, token and token secret"); + } + + return input.apply(Create.of(TwitterConnection.getInstance())); Review comment: I didn't like this way of doing it, I don't wish to hide complexity from the user like this. I want to show how exactly the data is being read from the pipeline. I have re-organized this after reading the KafkaIO. My Idea is User giving us connection parameters, I converting that connection parameter to Twitter Config, This config then goes to `ReadFromTwitterDoFn` and a singleton Twitter connection is formed to fetch data. My Only concern here is that is ReadFromTwitterDoFn has a parallelism >1, that may create multiple instances of `Twitter connection` from different machines, even if it's a defined singleton. Does that make sense? Will it be better to putting ReadFromTwitterDoFn logic in `TwitterIO.java`? I need to think more on this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
