Seems like top-level comments on pull requests don't generate emails, so sending this here just as an FYI...
I updated this pull request based on everyone's feedback. The biggest change here is adding the WindowDataManager to ensure data is saved before we complete a NiFi transaction, at which point it would be gone if we lost it. I based this mostly off looking at the RabbitMQ operators, but let me know if anything doesn't seem right. Thanks, Bryan On Tue, Jan 26, 2016 at 4:46 PM, bbende <[email protected]> wrote: > Github user bbende commented on a diff in the pull request: > > > https://github.com/apache/incubator-apex-malhar/pull/133#discussion_r50905617 > > --- Diff: > contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java > --- > @@ -0,0 +1,162 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > + * software distributed under the License is distributed on an > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > + * KIND, either express or implied. See the License for the > + * specific language governing permissions and limitations > + * under the License. > + */ > +package com.datatorrent.contrib.nifi; > + > +import java.io.IOException; > +import java.util.ArrayList; > +import java.util.List; > + > +import org.slf4j.Logger; > +import org.slf4j.LoggerFactory; > + > +import org.apache.nifi.remote.Transaction; > +import org.apache.nifi.remote.TransferDirection; > +import org.apache.nifi.remote.client.SiteToSiteClient; > +import org.apache.nifi.remote.protocol.DataPacket; > + > +import com.datatorrent.api.Context; > +import com.datatorrent.api.InputOperator; > + > +/** > + * This is the base implementation of a NiFi input operator. > + * Subclasses should implement the methods which convert NiFi > DataPackets to tuples and emit them. > + * <p> > + * Ports:<br> > + * <b>Input</b>: No input port<br> > + * <b>Output</b>: Can have any number of output ports<br> > + * <br> > + * Properties:<br> > + * None<br> > + * <br> > + * Compile time checks:<br> > + * Classes derived from this have to implement the abstract methods > emitTuples(List<T> tuples) > + * and createTuple(DataPacket dp)<br> > + * <br> > + * Run time checks:<br> > + * None<br> > + * <br> > + * Benchmarks:<br> > + * TBD<br> > + * </p> > + * > + * @displayName Abstract NiFi Input > + * @category Messaging > + * @tags input operator > + * @since 3.3.0 > + */ > + > +public abstract class AbstractNiFiInputOperator<T> implements > InputOperator > +{ > + > + private static final Logger LOGGER = > LoggerFactory.getLogger(AbstractNiFiInputOperator.class); > + > + private SiteToSiteClient client; > + private final SiteToSiteClient.Builder siteToSiteBuilder; > + > + /** > + * @param siteToSiteBuilder the builder for a NiFi SiteToSiteClient > + */ > + public AbstractNiFiInputOperator(final SiteToSiteClient.Builder > siteToSiteBuilder) > + { > + this.siteToSiteBuilder = siteToSiteBuilder; > + } > + > + @Override > + public void setup(Context.OperatorContext context) > + { > + this.client = siteToSiteBuilder.build(); > + } > + > + @Override > + public void teardown() > + { > + try { > + client.close(); > + } catch (IOException e) { > + LOGGER.error(e.getMessage(), e); > + } > + } > + > + @Override > + public void emitTuples() > + { > + try { > + final Transaction transaction = > client.createTransaction(TransferDirection.RECEIVE); > + if (transaction == null) { > + LOGGER.warn("A transaction could not be created, > returning..."); > + return; > + } > + > + DataPacket dataPacket = transaction.receive(); > + if (dataPacket == null) { > + transaction.confirm(); > + transaction.complete(); > + LOGGER.debug("No data available to pull, returning and will > try again..."); > + return; > + } > + > + // read all of the available data packets and convert to the > given type > + final List<T> tuples = new ArrayList<>(); > + do { > + tuples.add(createTuple(dataPacket)); > + dataPacket = transaction.receive(); > + } while (dataPacket != null); > --- End diff -- > > shouldn't be able to get an infinite loop... transaction.receive() > will return null when there is no matter data to pull, or when it is has > pulled the maximum number of data packets for a transaction (configured on > the site-to-site client) > > > --- > If your project is set up for it, you can reply to this email and have your > reply appear on GitHub as well. If your project does not have this feature > enabled and wishes so, or if the feature is enabled but not working, please > contact infrastructure at [email protected] or file a JIRA ticket > with INFRA. > --- >
